Commit 0468cf8cf5c6f0e9c21d1e9ff1e0b59662c612c0

Authored by Andrii Shvaika
1 parent 1affc60a

Fix duplication of MQTT packets in MQTT Client

@@ -365,7 +365,8 @@ final class MqttClientImpl implements MqttClient { @@ -365,7 +365,8 @@ final class MqttClientImpl implements MqttClient {
365 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0); 365 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
366 MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId()); 366 MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
367 MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload); 367 MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
368 - MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future, payload.retain(), message, qos); 368 + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.packetId(), future,
  369 + payload.retain(), message, qos, () -> !pendingPublishes.containsKey(variableHeader.packetId()));
369 this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish); 370 this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish);
370 ChannelFuture channelFuture = this.sendAndFlushPacket(message); 371 ChannelFuture channelFuture = this.sendAndFlushPacket(message);
371 372
@@ -471,7 +472,8 @@ final class MqttClientImpl implements MqttClient { @@ -471,7 +472,8 @@ final class MqttClientImpl implements MqttClient {
471 MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription)); 472 MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
472 MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload); 473 MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
473 474
474 - final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message); 475 + final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message,
  476 + () -> !pendingSubscriptions.containsKey(variableHeader.messageId()));
475 pendingSubscription.addHandler(handler, once); 477 pendingSubscription.addHandler(handler, once);
476 this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription); 478 this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription);
477 this.pendingSubscribeTopics.add(topic); 479 this.pendingSubscribeTopics.add(topic);
@@ -489,7 +491,8 @@ final class MqttClientImpl implements MqttClient { @@ -489,7 +491,8 @@ final class MqttClientImpl implements MqttClient {
489 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); 491 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
490 MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload); 492 MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
491 493
492 - MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message); 494 + MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message,
  495 + () -> !pendingServerUnsubscribes.containsKey(variableHeader.messageId()));
493 this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription); 496 this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription);
494 pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); 497 pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
495 498
@@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise; @@ -24,7 +24,7 @@ import io.netty.util.concurrent.Promise;
24 24
25 import java.util.function.Consumer; 25 import java.util.function.Consumer;
26 26
27 -final class MqttPendingPublish{ 27 +final class MqttPendingPublish {
28 28
29 private final int messageId; 29 private final int messageId;
30 private final Promise<Void> future; 30 private final Promise<Void> future;
@@ -32,19 +32,21 @@ final class MqttPendingPublish{ @@ -32,19 +32,21 @@ final class MqttPendingPublish{
32 private final MqttPublishMessage message; 32 private final MqttPublishMessage message;
33 private final MqttQoS qos; 33 private final MqttQoS qos;
34 34
35 - private final RetransmissionHandler<MqttPublishMessage> publishRetransmissionHandler = new RetransmissionHandler<>();  
36 - private final RetransmissionHandler<MqttMessage> pubrelRetransmissionHandler = new RetransmissionHandler<>(); 35 + private final RetransmissionHandler<MqttPublishMessage> publishRetransmissionHandler;
  36 + private final RetransmissionHandler<MqttMessage> pubrelRetransmissionHandler;
37 37
38 private boolean sent = false; 38 private boolean sent = false;
39 39
40 - MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) { 40 + MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos, PendingOperation operation) {
41 this.messageId = messageId; 41 this.messageId = messageId;
42 this.future = future; 42 this.future = future;
43 this.payload = payload; 43 this.payload = payload;
44 this.message = message; 44 this.message = message;
45 this.qos = qos; 45 this.qos = qos;
46 46
  47 + this.publishRetransmissionHandler = new RetransmissionHandler<>(operation);
47 this.publishRetransmissionHandler.setOriginalMessage(message); 48 this.publishRetransmissionHandler.setOriginalMessage(message);
  49 + this.pubrelRetransmissionHandler = new RetransmissionHandler<>(operation);
48 } 50 }
49 51
50 int getMessageId() { 52 int getMessageId() {
@@ -99,8 +101,11 @@ final class MqttPendingPublish{ @@ -99,8 +101,11 @@ final class MqttPendingPublish{
99 this.pubrelRetransmissionHandler.stop(); 101 this.pubrelRetransmissionHandler.stop();
100 } 102 }
101 103
102 - void onChannelClosed(){ 104 + void onChannelClosed() {
103 this.publishRetransmissionHandler.stop(); 105 this.publishRetransmissionHandler.stop();
104 this.pubrelRetransmissionHandler.stop(); 106 this.pubrelRetransmissionHandler.stop();
  107 + if (payload != null) {
  108 + payload.release();
  109 + }
105 } 110 }
106 } 111 }
@@ -23,22 +23,23 @@ import java.util.HashSet; @@ -23,22 +23,23 @@ import java.util.HashSet;
23 import java.util.Set; 23 import java.util.Set;
24 import java.util.function.Consumer; 24 import java.util.function.Consumer;
25 25
26 -final class MqttPendingSubscription{ 26 +final class MqttPendingSubscription {
27 27
28 private final Promise<Void> future; 28 private final Promise<Void> future;
29 private final String topic; 29 private final String topic;
30 private final Set<MqttPendingHandler> handlers = new HashSet<>(); 30 private final Set<MqttPendingHandler> handlers = new HashSet<>();
31 private final MqttSubscribeMessage subscribeMessage; 31 private final MqttSubscribeMessage subscribeMessage;
32 32
33 - private final RetransmissionHandler<MqttSubscribeMessage> retransmissionHandler = new RetransmissionHandler<>(); 33 + private final RetransmissionHandler<MqttSubscribeMessage> retransmissionHandler;
34 34
35 private boolean sent = false; 35 private boolean sent = false;
36 36
37 - MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message) { 37 + MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message, PendingOperation operation) {
38 this.future = future; 38 this.future = future;
39 this.topic = topic; 39 this.topic = topic;
40 this.subscribeMessage = message; 40 this.subscribeMessage = message;
41 41
  42 + this.retransmissionHandler = new RetransmissionHandler<>(operation);
42 this.retransmissionHandler.setOriginalMessage(message); 43 this.retransmissionHandler.setOriginalMessage(message);
43 } 44 }
44 45
@@ -62,7 +63,7 @@ final class MqttPendingSubscription{ @@ -62,7 +63,7 @@ final class MqttPendingSubscription{
62 return subscribeMessage; 63 return subscribeMessage;
63 } 64 }
64 65
65 - void addHandler(MqttHandler handler, boolean once){ 66 + void addHandler(MqttHandler handler, boolean once) {
66 this.handlers.add(new MqttPendingHandler(handler, once)); 67 this.handlers.add(new MqttPendingHandler(handler, once));
67 } 68 }
68 69
@@ -71,14 +72,14 @@ final class MqttPendingSubscription{ @@ -71,14 +72,14 @@ final class MqttPendingSubscription{
71 } 72 }
72 73
73 void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) { 74 void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
74 - if(this.sent){ //If the packet is sent, we can start the retransmit timer 75 + if (this.sent) { //If the packet is sent, we can start the retransmit timer
75 this.retransmissionHandler.setHandle((fixedHeader, originalMessage) -> 76 this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
76 sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload()))); 77 sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
77 this.retransmissionHandler.start(eventLoop); 78 this.retransmissionHandler.start(eventLoop);
78 } 79 }
79 } 80 }
80 81
81 - void onSubackReceived(){ 82 + void onSubackReceived() {
82 this.retransmissionHandler.stop(); 83 this.retransmissionHandler.stop();
83 } 84 }
84 85
@@ -100,7 +101,7 @@ final class MqttPendingSubscription{ @@ -100,7 +101,7 @@ final class MqttPendingSubscription{
100 } 101 }
101 } 102 }
102 103
103 - void onChannelClosed(){ 104 + void onChannelClosed() {
104 this.retransmissionHandler.stop(); 105 this.retransmissionHandler.stop();
105 } 106 }
106 } 107 }
@@ -26,12 +26,13 @@ final class MqttPendingUnsubscription{ @@ -26,12 +26,13 @@ final class MqttPendingUnsubscription{
26 private final Promise<Void> future; 26 private final Promise<Void> future;
27 private final String topic; 27 private final String topic;
28 28
29 - private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>(); 29 + private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler;
30 30
31 - MqttPendingUnsubscription(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) { 31 + MqttPendingUnsubscription(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage, PendingOperation operation) {
32 this.future = future; 32 this.future = future;
33 this.topic = topic; 33 this.topic = topic;
34 34
  35 + this.retransmissionHandler = new RetransmissionHandler<>(operation);
35 this.retransmissionHandler.setOriginalMessage(unsubscribeMessage); 36 this.retransmissionHandler.setOriginalMessage(unsubscribeMessage);
36 } 37 }
37 38
@@ -28,10 +28,10 @@ final class MqttSubscription { @@ -28,10 +28,10 @@ final class MqttSubscription {
28 private boolean called; 28 private boolean called;
29 29
30 MqttSubscription(String topic, MqttHandler handler, boolean once) { 30 MqttSubscription(String topic, MqttHandler handler, boolean once) {
31 - if(topic == null){ 31 + if (topic == null) {
32 throw new NullPointerException("topic"); 32 throw new NullPointerException("topic");
33 } 33 }
34 - if(handler == null){ 34 + if (handler == null) {
35 throw new NullPointerException("handler"); 35 throw new NullPointerException("handler");
36 } 36 }
37 this.topic = topic; 37 this.topic = topic;
@@ -56,7 +56,7 @@ final class MqttSubscription { @@ -56,7 +56,7 @@ final class MqttSubscription {
56 return called; 56 return called;
57 } 57 }
58 58
59 - boolean matches(String topic){ 59 + boolean matches(String topic) {
60 return this.topicRegex.matcher(topic).matches(); 60 return this.topicRegex.matcher(topic).matches();
61 } 61 }
62 62
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +public interface PendingOperation {
  19 +
  20 + boolean isCanceled();
  21 +
  22 +}
@@ -21,33 +21,43 @@ import io.netty.handler.codec.mqtt.MqttMessage; @@ -21,33 +21,43 @@ import io.netty.handler.codec.mqtt.MqttMessage;
21 import io.netty.handler.codec.mqtt.MqttMessageType; 21 import io.netty.handler.codec.mqtt.MqttMessageType;
22 import io.netty.handler.codec.mqtt.MqttQoS; 22 import io.netty.handler.codec.mqtt.MqttQoS;
23 import io.netty.util.concurrent.ScheduledFuture; 23 import io.netty.util.concurrent.ScheduledFuture;
  24 +import lombok.RequiredArgsConstructor;
24 25
25 import java.util.concurrent.TimeUnit; 26 import java.util.concurrent.TimeUnit;
26 import java.util.function.BiConsumer; 27 import java.util.function.BiConsumer;
27 28
  29 +@RequiredArgsConstructor
28 final class RetransmissionHandler<T extends MqttMessage> { 30 final class RetransmissionHandler<T extends MqttMessage> {
29 31
  32 + private volatile boolean stopped;
  33 + private final PendingOperation pendingOperation;
30 private ScheduledFuture<?> timer; 34 private ScheduledFuture<?> timer;
31 private int timeout = 10; 35 private int timeout = 10;
32 private BiConsumer<MqttFixedHeader, T> handler; 36 private BiConsumer<MqttFixedHeader, T> handler;
33 private T originalMessage; 37 private T originalMessage;
34 38
35 - void start(EventLoop eventLoop){  
36 - if(eventLoop == null){ 39 + void start(EventLoop eventLoop) {
  40 + if (eventLoop == null) {
37 throw new NullPointerException("eventLoop"); 41 throw new NullPointerException("eventLoop");
38 } 42 }
39 - if(this.handler == null){ 43 + if (this.handler == null) {
40 throw new NullPointerException("handler"); 44 throw new NullPointerException("handler");
41 } 45 }
42 this.timeout = 10; 46 this.timeout = 10;
43 this.startTimer(eventLoop); 47 this.startTimer(eventLoop);
44 } 48 }
45 49
46 - private void startTimer(EventLoop eventLoop){ 50 + private void startTimer(EventLoop eventLoop) {
  51 + if (stopped || pendingOperation.isCanceled()) {
  52 + return;
  53 + }
47 this.timer = eventLoop.schedule(() -> { 54 this.timer = eventLoop.schedule(() -> {
  55 + if (stopped || pendingOperation.isCanceled()) {
  56 + return;
  57 + }
48 this.timeout += 5; 58 this.timeout += 5;
49 boolean isDup = this.originalMessage.fixedHeader().isDup(); 59 boolean isDup = this.originalMessage.fixedHeader().isDup();
50 - if(this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE){ 60 + if (this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE) {
51 isDup = true; 61 isDup = true;
52 } 62 }
53 MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); 63 MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
@@ -56,8 +66,9 @@ final class RetransmissionHandler<T extends MqttMessage> { @@ -56,8 +66,9 @@ final class RetransmissionHandler<T extends MqttMessage> {
56 }, timeout, TimeUnit.SECONDS); 66 }, timeout, TimeUnit.SECONDS);
57 } 67 }
58 68
59 - void stop(){  
60 - if(this.timer != null){ 69 + void stop() {
  70 + stopped = true;
  71 + if (this.timer != null) {
61 this.timer.cancel(true); 72 this.timer.cancel(true);
62 } 73 }
63 } 74 }