Commit b310f69c07b48de4268dd4e93f2fbca397c7c8e6
Committed by
GitHub
Merge pull request #2836 from blackstar-baba/master
Follow mqtt 3.1.1 protocol number 3.3.1-2
Showing
1 changed file
with
6 additions
and
2 deletions
@@ -18,8 +18,9 @@ package org.thingsboard.mqtt; | @@ -18,8 +18,9 @@ package org.thingsboard.mqtt; | ||
18 | import io.netty.channel.EventLoop; | 18 | import io.netty.channel.EventLoop; |
19 | import io.netty.handler.codec.mqtt.MqttFixedHeader; | 19 | import io.netty.handler.codec.mqtt.MqttFixedHeader; |
20 | import io.netty.handler.codec.mqtt.MqttMessage; | 20 | import io.netty.handler.codec.mqtt.MqttMessage; |
21 | -import io.netty.util.concurrent.ScheduledFuture; | ||
22 | import io.netty.handler.codec.mqtt.MqttMessageType; | 21 | import io.netty.handler.codec.mqtt.MqttMessageType; |
22 | +import io.netty.handler.codec.mqtt.MqttQoS; | ||
23 | +import io.netty.util.concurrent.ScheduledFuture; | ||
23 | 24 | ||
24 | import java.util.concurrent.TimeUnit; | 25 | import java.util.concurrent.TimeUnit; |
25 | import java.util.function.BiConsumer; | 26 | import java.util.function.BiConsumer; |
@@ -45,7 +46,10 @@ final class RetransmissionHandler<T extends MqttMessage> { | @@ -45,7 +46,10 @@ final class RetransmissionHandler<T extends MqttMessage> { | ||
45 | private void startTimer(EventLoop eventLoop){ | 46 | private void startTimer(EventLoop eventLoop){ |
46 | this.timer = eventLoop.schedule(() -> { | 47 | this.timer = eventLoop.schedule(() -> { |
47 | this.timeout += 5; | 48 | this.timeout += 5; |
48 | - boolean isDup = this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH ? true : this.originalMessage.fixedHeader().isDup(); | 49 | + boolean isDup = this.originalMessage.fixedHeader().isDup(); |
50 | + if(this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE){ | ||
51 | + isDup = true; | ||
52 | + } | ||
49 | MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); | 53 | MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); |
50 | handler.accept(fixedHeader, originalMessage); | 54 | handler.accept(fixedHeader, originalMessage); |
51 | startTimer(eventLoop); | 55 | startTimer(eventLoop); |