Commit e7c4e7685196d805faebb0e28260a163c7c224d9

Authored by zbeacon
Committed by Andrew Shvayka
1 parent 037d1797

Added removing for timers in pendingPublishes on channel is closed

@@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
55 import java.util.concurrent.ConcurrentMap; 55 import java.util.concurrent.ConcurrentMap;
56 import java.util.concurrent.TimeUnit; 56 import java.util.concurrent.TimeUnit;
57 import java.util.concurrent.atomic.AtomicInteger; 57 import java.util.concurrent.atomic.AtomicInteger;
  58 +import java.util.function.BiConsumer;
58 59
59 /** 60 /**
60 * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times 61 * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times
@@ -160,6 +161,7 @@ final class MqttClientImpl implements MqttClient { @@ -160,6 +161,7 @@ final class MqttClientImpl implements MqttClient {
160 subscriptions.clear(); 161 subscriptions.clear();
161 pendingServerUnsubscribes.clear(); 162 pendingServerUnsubscribes.clear();
162 qos2PendingIncomingPublishes.clear(); 163 qos2PendingIncomingPublishes.clear();
  164 + pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed());
163 pendingPublishes.clear(); 165 pendingPublishes.clear();
164 pendingSubscribeTopics.clear(); 166 pendingSubscribeTopics.clear();
165 handlerToSubscribtion.clear(); 167 handlerToSubscribtion.clear();
@@ -366,20 +368,24 @@ final class MqttClientImpl implements MqttClient { @@ -366,20 +368,24 @@ final class MqttClientImpl implements MqttClient {
366 ChannelFuture channelFuture = this.sendAndFlushPacket(message); 368 ChannelFuture channelFuture = this.sendAndFlushPacket(message);
367 369
368 if (channelFuture != null) { 370 if (channelFuture != null) {
369 - pendingPublish.setSent(true);  
370 - if (channelFuture.cause() != null) {  
371 - this.pendingPublishes.remove(pendingPublish.getMessageId());  
372 - future.setFailure(channelFuture.cause());  
373 - return future;  
374 - }  
375 - }  
376 - if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {  
377 - this.pendingPublishes.remove(pendingPublish.getMessageId());  
378 - pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0  
379 - } else if (pendingPublish.isSent()) {  
380 - pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); 371 + channelFuture.addListener(result -> {
  372 + pendingPublish.setSent(true);
  373 + if (result.cause() != null) {
  374 + pendingPublishes.remove(pendingPublish.getMessageId());
  375 + future.setFailure(result.cause());
  376 + } else {
  377 + if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
  378 + pendingPublishes.remove(pendingPublish.getMessageId());
  379 + pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
  380 + } else if (pendingPublish.isSent()) {
  381 + pendingPublish.startPublishRetransmissionTimer(eventLoop.next(), MqttClientImpl.this::sendAndFlushPacket);
  382 + } else {
  383 + pendingPublishes.remove(pendingPublish.getMessageId());
  384 + }
  385 + }
  386 + });
381 } else { 387 } else {
382 - this.pendingPublishes.remove(pendingPublish.getMessageId()); 388 + pendingPublishes.remove(pendingPublish.getMessageId());
383 } 389 }
384 return future; 390 return future;
385 } 391 }
@@ -98,4 +98,9 @@ final class MqttPendingPublish { @@ -98,4 +98,9 @@ final class MqttPendingPublish {
98 void onPubcompReceived() { 98 void onPubcompReceived() {
99 this.pubrelRetransmissionHandler.stop(); 99 this.pubrelRetransmissionHandler.stop();
100 } 100 }
  101 +
  102 + void onChannelClosed() {
  103 + this.publishRetransmissionHandler.stop();
  104 + this.pubrelRetransmissionHandler.stop();
  105 + }
101 } 106 }