Commit 9cdf3dd80c2f6af64d313969e2d1b74635a78d03
1 parent
51ac96d0
Improvements in MQTT publish processing
Showing
1 changed file
with
2 additions
and
1 deletions
@@ -340,6 +340,7 @@ final class MqttClientImpl implements MqttClient { | @@ -340,6 +340,7 @@ final class MqttClientImpl implements MqttClient { | ||
340 | MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); | 340 | MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); |
341 | MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); | 341 | MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); |
342 | MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); | 342 | MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); |
343 | + this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); | ||
343 | ChannelFuture channelFuture = this.sendAndFlushPacket(message); | 344 | ChannelFuture channelFuture = this.sendAndFlushPacket(message); |
344 | 345 | ||
345 | if (channelFuture != null) { | 346 | if (channelFuture != null) { |
@@ -350,9 +351,9 @@ final class MqttClientImpl implements MqttClient { | @@ -350,9 +351,9 @@ final class MqttClientImpl implements MqttClient { | ||
350 | } | 351 | } |
351 | } | 352 | } |
352 | if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { | 353 | if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) { |
354 | + this.pendingPublishes.remove(pendingPublish.getMessageId()); | ||
353 | pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 | 355 | pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 |
354 | } else if (pendingPublish.isSent()) { | 356 | } else if (pendingPublish.isSent()) { |
355 | - this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); | ||
356 | pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); | 357 | pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); |
357 | } | 358 | } |
358 | return future; | 359 | return future; |