Showing
1 changed file
with
35 additions
and
12 deletions
@@ -19,18 +19,39 @@ import com.google.common.collect.HashMultimap; | @@ -19,18 +19,39 @@ import com.google.common.collect.HashMultimap; | ||
19 | import com.google.common.collect.ImmutableSet; | 19 | import com.google.common.collect.ImmutableSet; |
20 | import io.netty.bootstrap.Bootstrap; | 20 | import io.netty.bootstrap.Bootstrap; |
21 | import io.netty.buffer.ByteBuf; | 21 | import io.netty.buffer.ByteBuf; |
22 | -import io.netty.channel.*; | 22 | +import io.netty.channel.Channel; |
23 | +import io.netty.channel.ChannelFuture; | ||
24 | +import io.netty.channel.ChannelFutureListener; | ||
25 | +import io.netty.channel.ChannelInitializer; | ||
26 | +import io.netty.channel.EventLoopGroup; | ||
23 | import io.netty.channel.nio.NioEventLoopGroup; | 27 | import io.netty.channel.nio.NioEventLoopGroup; |
24 | import io.netty.channel.socket.SocketChannel; | 28 | import io.netty.channel.socket.SocketChannel; |
25 | -import io.netty.handler.codec.mqtt.*; | 29 | +import io.netty.handler.codec.mqtt.MqttDecoder; |
30 | +import io.netty.handler.codec.mqtt.MqttEncoder; | ||
31 | +import io.netty.handler.codec.mqtt.MqttFixedHeader; | ||
32 | +import io.netty.handler.codec.mqtt.MqttMessage; | ||
33 | +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; | ||
34 | +import io.netty.handler.codec.mqtt.MqttMessageType; | ||
35 | +import io.netty.handler.codec.mqtt.MqttPublishMessage; | ||
36 | +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; | ||
37 | +import io.netty.handler.codec.mqtt.MqttQoS; | ||
38 | +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; | ||
39 | +import io.netty.handler.codec.mqtt.MqttSubscribePayload; | ||
40 | +import io.netty.handler.codec.mqtt.MqttTopicSubscription; | ||
41 | +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; | ||
42 | +import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; | ||
26 | import io.netty.handler.ssl.SslContext; | 43 | import io.netty.handler.ssl.SslContext; |
27 | import io.netty.handler.timeout.IdleStateHandler; | 44 | import io.netty.handler.timeout.IdleStateHandler; |
28 | -import io.netty.util.collection.IntObjectHashMap; | ||
29 | import io.netty.util.concurrent.DefaultPromise; | 45 | import io.netty.util.concurrent.DefaultPromise; |
30 | import io.netty.util.concurrent.Future; | 46 | import io.netty.util.concurrent.Future; |
31 | import io.netty.util.concurrent.Promise; | 47 | import io.netty.util.concurrent.Promise; |
32 | 48 | ||
33 | -import java.util.*; | 49 | +import java.util.Collections; |
50 | +import java.util.HashSet; | ||
51 | +import java.util.Map; | ||
52 | +import java.util.Optional; | ||
53 | +import java.util.Set; | ||
54 | +import java.util.concurrent.ConcurrentHashMap; | ||
34 | import java.util.concurrent.TimeUnit; | 55 | import java.util.concurrent.TimeUnit; |
35 | import java.util.concurrent.atomic.AtomicInteger; | 56 | import java.util.concurrent.atomic.AtomicInteger; |
36 | 57 | ||
@@ -41,11 +62,11 @@ import java.util.concurrent.atomic.AtomicInteger; | @@ -41,11 +62,11 @@ import java.util.concurrent.atomic.AtomicInteger; | ||
41 | final class MqttClientImpl implements MqttClient { | 62 | final class MqttClientImpl implements MqttClient { |
42 | 63 | ||
43 | private final Set<String> serverSubscriptions = new HashSet<>(); | 64 | private final Set<String> serverSubscriptions = new HashSet<>(); |
44 | - private final IntObjectHashMap<MqttPendingUnsubscription> pendingServerUnsubscribes = new IntObjectHashMap<>(); | ||
45 | - private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>(); | ||
46 | - private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>(); | 65 | + private final ConcurrentHashMap<Integer, MqttPendingUnsubscription> pendingServerUnsubscribes = new ConcurrentHashMap<>(); |
66 | + private final ConcurrentHashMap<Integer, MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new ConcurrentHashMap<>(); | ||
67 | + private final ConcurrentHashMap<Integer, MqttPendingPublish> pendingPublishes = new ConcurrentHashMap<>(); | ||
47 | private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create(); | 68 | private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create(); |
48 | - private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap<>(); | 69 | + private final ConcurrentHashMap<Integer, MqttPendingSubscription> pendingSubscriptions = new ConcurrentHashMap<>(); |
49 | private final Set<String> pendingSubscribeTopics = new HashSet<>(); | 70 | private final Set<String> pendingSubscribeTopics = new HashSet<>(); |
50 | private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create(); | 71 | private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create(); |
51 | private final AtomicInteger nextMessageId = new AtomicInteger(1); | 72 | private final AtomicInteger nextMessageId = new AtomicInteger(1); |
@@ -355,6 +376,8 @@ final class MqttClientImpl implements MqttClient { | @@ -355,6 +376,8 @@ final class MqttClientImpl implements MqttClient { | ||
355 | pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 | 376 | pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0 |
356 | } else if (pendingPublish.isSent()) { | 377 | } else if (pendingPublish.isSent()) { |
357 | pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); | 378 | pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); |
379 | + } else { | ||
380 | + this.pendingPublishes.remove(pendingPublish.getMessageId()); | ||
358 | } | 381 | } |
359 | return future; | 382 | return future; |
360 | } | 383 | } |
@@ -466,7 +489,7 @@ final class MqttClientImpl implements MqttClient { | @@ -466,7 +489,7 @@ final class MqttClientImpl implements MqttClient { | ||
466 | } | 489 | } |
467 | } | 490 | } |
468 | 491 | ||
469 | - IntObjectHashMap<MqttPendingSubscription> getPendingSubscriptions() { | 492 | + ConcurrentHashMap<Integer, MqttPendingSubscription> getPendingSubscriptions() { |
470 | return pendingSubscriptions; | 493 | return pendingSubscriptions; |
471 | } | 494 | } |
472 | 495 | ||
@@ -486,15 +509,15 @@ final class MqttClientImpl implements MqttClient { | @@ -486,15 +509,15 @@ final class MqttClientImpl implements MqttClient { | ||
486 | return serverSubscriptions; | 509 | return serverSubscriptions; |
487 | } | 510 | } |
488 | 511 | ||
489 | - IntObjectHashMap<MqttPendingUnsubscription> getPendingServerUnsubscribes() { | 512 | + ConcurrentHashMap<Integer, MqttPendingUnsubscription> getPendingServerUnsubscribes() { |
490 | return pendingServerUnsubscribes; | 513 | return pendingServerUnsubscribes; |
491 | } | 514 | } |
492 | 515 | ||
493 | - IntObjectHashMap<MqttPendingPublish> getPendingPublishes() { | 516 | + ConcurrentHashMap<Integer, MqttPendingPublish> getPendingPublishes() { |
494 | return pendingPublishes; | 517 | return pendingPublishes; |
495 | } | 518 | } |
496 | 519 | ||
497 | - IntObjectHashMap<MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() { | 520 | + ConcurrentHashMap<Integer, MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() { |
498 | return qos2PendingIncomingPublishes; | 521 | return qos2PendingIncomingPublishes; |
499 | } | 522 | } |
500 | 523 |