Commit 06f199b9362df2300ef95b5e55449f4d1fbb6ba3

Authored by zbeacon
Committed by Andrew Shvayka
1 parent e7c4e768

Added stopping for subscriptions handlers and for unsubscription handlers

@@ -156,9 +156,11 @@ final class MqttClientImpl implements MqttClient { @@ -156,9 +156,11 @@ final class MqttClientImpl implements MqttClient {
156 if (callback != null) { 156 if (callback != null) {
157 callback.connectionLost(e); 157 callback.connectionLost(e);
158 } 158 }
  159 + pendingSubscriptions.forEach((id, mqttPendingSubscription) -> mqttPendingSubscription.onChannelClosed());
159 pendingSubscriptions.clear(); 160 pendingSubscriptions.clear();
160 serverSubscriptions.clear(); 161 serverSubscriptions.clear();
161 subscriptions.clear(); 162 subscriptions.clear();
  163 + pendingServerUnsubscribes.forEach((id, mqttPendingServerUnsubscribes) -> mqttPendingServerUnsubscribes.onChannelClosed());
162 pendingServerUnsubscribes.clear(); 164 pendingServerUnsubscribes.clear();
163 qos2PendingIncomingPublishes.clear(); 165 qos2PendingIncomingPublishes.clear();
164 pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed()); 166 pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed());
@@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise; @@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise;
24 24
25 import java.util.function.Consumer; 25 import java.util.function.Consumer;
26 26
27 -final class MqttPendingPublish { 27 +final class MqttPendingPublish{
28 28
29 private final int messageId; 29 private final int messageId;
30 private final Promise<Void> future; 30 private final Promise<Void> future;
@@ -99,7 +99,7 @@ final class MqttPendingPublish { @@ -99,7 +99,7 @@ final class MqttPendingPublish {
99 this.pubrelRetransmissionHandler.stop(); 99 this.pubrelRetransmissionHandler.stop();
100 } 100 }
101 101
102 - void onChannelClosed() { 102 + void onChannelClosed(){
103 this.publishRetransmissionHandler.stop(); 103 this.publishRetransmissionHandler.stop();
104 this.pubrelRetransmissionHandler.stop(); 104 this.pubrelRetransmissionHandler.stop();
105 } 105 }
@@ -23,7 +23,7 @@ import java.util.HashSet; @@ -23,7 +23,7 @@ import java.util.HashSet;
23 import java.util.Set; 23 import java.util.Set;
24 import java.util.function.Consumer; 24 import java.util.function.Consumer;
25 25
26 -final class MqttPendingSubscription { 26 +final class MqttPendingSubscription{
27 27
28 private final Promise<Void> future; 28 private final Promise<Void> future;
29 private final String topic; 29 private final String topic;
@@ -99,4 +99,8 @@ final class MqttPendingSubscription { @@ -99,4 +99,8 @@ final class MqttPendingSubscription {
99 return once; 99 return once;
100 } 100 }
101 } 101 }
  102 +
  103 + void onChannelClosed(){
  104 + this.retransmissionHandler.stop();
  105 + }
102 } 106 }
@@ -21,7 +21,7 @@ import io.netty.util.concurrent.Promise; @@ -21,7 +21,7 @@ import io.netty.util.concurrent.Promise;
21 21
22 import java.util.function.Consumer; 22 import java.util.function.Consumer;
23 23
24 -final class MqttPendingUnsubscription { 24 +final class MqttPendingUnsubscription{
25 25
26 private final Promise<Void> future; 26 private final Promise<Void> future;
27 private final String topic; 27 private final String topic;
@@ -52,4 +52,8 @@ final class MqttPendingUnsubscription { @@ -52,4 +52,8 @@ final class MqttPendingUnsubscription {
52 void onUnsubackReceived(){ 52 void onUnsubackReceived(){
53 this.retransmissionHandler.stop(); 53 this.retransmissionHandler.stop();
54 } 54 }
  55 +
  56 + void onChannelClosed(){
  57 + this.retransmissionHandler.stop();
  58 + }
55 } 59 }