Commit 3b52f9817df6f2a2473120a1b294bbc5d1ed12ec

Authored by Andrew Shvayka
1 parent 5206a0e4

MessageId to PacketId refactoring

@@ -185,21 +185,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -185,21 +185,21 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
185 185
186 case AT_LEAST_ONCE: 186 case AT_LEAST_ONCE:
187 invokeHandlersForIncomingPublish(message); 187 invokeHandlersForIncomingPublish(message);
188 - if (message.variableHeader().messageId() != -1) { 188 + if (message.variableHeader().packetId() != -1) {
189 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); 189 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
190 - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); 190 + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
191 channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader)); 191 channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
192 } 192 }
193 break; 193 break;
194 194
195 case EXACTLY_ONCE: 195 case EXACTLY_ONCE:
196 - if (message.variableHeader().messageId() != -1) { 196 + if (message.variableHeader().packetId() != -1) {
197 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0); 197 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
198 - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId()); 198 + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().packetId());
199 MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader); 199 MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
200 200
201 MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage); 201 MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage);
202 - this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish); 202 + this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish);
203 message.payload().retain(); 203 message.payload().retain();
204 incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket); 204 incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
205 205
@@ -249,7 +249,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -249,7 +249,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
249 MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); 249 MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
250 this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish()); 250 this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
251 incomingQos2Publish.onPubrelReceived(); 251 incomingQos2Publish.onPubrelReceived();
252 - this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId()); 252 + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
253 } 253 }
254 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); 254 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
255 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId()); 255 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
@@ -339,7 +339,7 @@ final class MqttClientImpl implements MqttClient { @@ -339,7 +339,7 @@ final class MqttClientImpl implements MqttClient {
339 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0); 339 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
340 MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); 340 MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
341 MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); 341 MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
342 - MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos); 342 + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos);
343 ChannelFuture channelFuture = this.sendAndFlushPacket(message); 343 ChannelFuture channelFuture = this.sendAndFlushPacket(message);
344 344
345 if (channelFuture != null) { 345 if (channelFuture != null) {