Commit 00b5d36e0bf8d8295463018537dbbd5744902b7b
Committed by
GitHub
1 parent
d94cb9ca
fix bug: Under high concurrency, Mqtt client nextMessageId exceeds the 0xffff limit (#2564)
The compareAndSet method and getAndIncrement method of the AtomicInteger class are atomic, but when these two methods are used at the same time, they are no longer atomic.
Showing
1 changed file
with
6 additions
and
2 deletions
@@ -407,8 +407,12 @@ final class MqttClientImpl implements MqttClient { | @@ -407,8 +407,12 @@ final class MqttClientImpl implements MqttClient { | ||
407 | } | 407 | } |
408 | 408 | ||
409 | private MqttMessageIdVariableHeader getNewMessageId() { | 409 | private MqttMessageIdVariableHeader getNewMessageId() { |
410 | - this.nextMessageId.compareAndSet(0xffff, 1); | ||
411 | - return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement()); | 410 | + int messageId; |
411 | + synchronized (this.nextMessageId) { | ||
412 | + this.nextMessageId.compareAndSet(0xffff, 1); | ||
413 | + messageId = this.nextMessageId.getAndIncrement(); | ||
414 | + } | ||
415 | + return MqttMessageIdVariableHeader.from(messageId); | ||
412 | } | 416 | } |
413 | 417 | ||
414 | private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) { | 418 | private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) { |