Commit dda3124835bf559118abe71a4566216e3d1388c3
Committed by
GitHub
1 parent
507bd43a
fix bug: Resending the message causes the client to interrupt
In the mqtt 3.1 protocol or mqtt 3.1.1 protocol, except for messages of type pubish, the isDup of other messages must be zero,Sending a subscription message multiple times causes the client to reconnect,therefore, resending the subscription message with isDup true will cause the server to think that the message is abnormal, thus interrupting the connection with the client
Showing
1 changed file
with
3 additions
and
1 deletions
... | ... | @@ -19,6 +19,7 @@ import io.netty.channel.EventLoop; |
19 | 19 | import io.netty.handler.codec.mqtt.MqttFixedHeader; |
20 | 20 | import io.netty.handler.codec.mqtt.MqttMessage; |
21 | 21 | import io.netty.util.concurrent.ScheduledFuture; |
22 | +import io.netty.handler.codec.mqtt.MqttMessageType; | |
22 | 23 | |
23 | 24 | import java.util.concurrent.TimeUnit; |
24 | 25 | import java.util.function.BiConsumer; |
... | ... | @@ -44,7 +45,8 @@ final class RetransmissionHandler<T extends MqttMessage> { |
44 | 45 | private void startTimer(EventLoop eventLoop){ |
45 | 46 | this.timer = eventLoop.schedule(() -> { |
46 | 47 | this.timeout += 5; |
47 | - MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); | |
48 | + boolean isDup = this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH ? true : this.originalMessage.fixedHeader().isDup(); | |
49 | + MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength()); | |
48 | 50 | handler.accept(fixedHeader, originalMessage); |
49 | 51 | startTimer(eventLoop); |
50 | 52 | }, timeout, TimeUnit.SECONDS); | ... | ... |