Commit 309598c2ac997044d7139184a2528db2b1d595c7

Authored by Igor Kulikov
1 parent 17adf8bd

subscribtion -> subscription typo fix.

@@ -158,15 +158,15 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -158,15 +158,15 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
158 } 158 }
159 159
160 private void handleSubAck(MqttSubAckMessage message) { 160 private void handleSubAck(MqttSubAckMessage message) {
161 - MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId()); 161 + MqttPendingSubscription pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
162 if (pendingSubscription == null) { 162 if (pendingSubscription == null) {
163 return; 163 return;
164 } 164 }
165 pendingSubscription.onSubackReceived(); 165 pendingSubscription.onSubackReceived();
166 - for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {  
167 - MqttSubscription subscribtion = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());  
168 - this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);  
169 - this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion); 166 + for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
  167 + MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
  168 + this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscription);
  169 + this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscription);
170 } 170 }
171 this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic()); 171 this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
172 172
@@ -210,13 +210,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -210,13 +210,13 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
210 } 210 }
211 211
212 private void handleUnsuback(MqttUnsubAckMessage message) { 212 private void handleUnsuback(MqttUnsubAckMessage message) {
213 - MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());  
214 - if (unsubscribtion == null) { 213 + MqttPendingUnsubscription unsubscription = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
  214 + if (unsubscription == null) {
215 return; 215 return;
216 } 216 }
217 - unsubscribtion.onUnsubackReceived();  
218 - this.client.getServerSubscriptions().remove(unsubscribtion.getTopic());  
219 - unsubscribtion.getFuture().setSuccess(null); 217 + unsubscription.onUnsubackReceived();
  218 + this.client.getServerSubscriptions().remove(unsubscription.getTopic());
  219 + unsubscription.getFuture().setSuccess(null);
220 this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId()); 220 this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
221 } 221 }
222 222
@@ -92,7 +92,7 @@ public interface MqttClient { @@ -92,7 +92,7 @@ public interface MqttClient {
92 92
93 /** 93 /**
94 * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler 94 * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
95 - * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed 95 + * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
96 * 96 *
97 * @param topic The topic filter to subscribe to 97 * @param topic The topic filter to subscribe to
98 * @param handler The handler to invoke when we receive a message 98 * @param handler The handler to invoke when we receive a message
@@ -102,7 +102,7 @@ public interface MqttClient { @@ -102,7 +102,7 @@ public interface MqttClient {
102 102
103 /** 103 /**
104 * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler 104 * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
105 - * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed 105 + * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
106 * 106 *
107 * @param topic The topic filter to subscribe to 107 * @param topic The topic filter to subscribe to
108 * @param handler The handler to invoke when we receive a message 108 * @param handler The handler to invoke when we receive a message
@@ -112,7 +112,7 @@ public interface MqttClient { @@ -112,7 +112,7 @@ public interface MqttClient {
112 Future<Void> once(String topic, MqttHandler handler, MqttQoS qos); 112 Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
113 113
114 /** 114 /**
115 - * Remove the subscribtion for the given topic and handler 115 + * Remove the subscription for the given topic and handler
116 * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)} 116 * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
117 * 117 *
118 * @param topic The topic to unsubscribe for 118 * @param topic The topic to unsubscribe for
@@ -122,7 +122,7 @@ public interface MqttClient { @@ -122,7 +122,7 @@ public interface MqttClient {
122 Future<Void> off(String topic, MqttHandler handler); 122 Future<Void> off(String topic, MqttHandler handler);
123 123
124 /** 124 /**
125 - * Remove all subscribtions for the given topic. 125 + * Remove all subscriptions for the given topic.
126 * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)} 126 * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
127 * 127 *
128 * @param topic The topic to unsubscribe for 128 * @param topic The topic to unsubscribe for
@@ -41,11 +41,11 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -41,11 +41,11 @@ import java.util.concurrent.atomic.AtomicInteger;
41 final class MqttClientImpl implements MqttClient { 41 final class MqttClientImpl implements MqttClient {
42 42
43 private final Set<String> serverSubscriptions = new HashSet<>(); 43 private final Set<String> serverSubscriptions = new HashSet<>();
44 - private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>(); 44 + private final IntObjectHashMap<MqttPendingUnsubscription> pendingServerUnsubscribes = new IntObjectHashMap<>();
45 private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>(); 45 private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
46 private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>(); 46 private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
47 private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create(); 47 private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
48 - private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscriptions = new IntObjectHashMap<>(); 48 + private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions = new IntObjectHashMap<>();
49 private final Set<String> pendingSubscribeTopics = new HashSet<>(); 49 private final Set<String> pendingSubscribeTopics = new HashSet<>();
50 private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create(); 50 private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create();
51 private final AtomicInteger nextMessageId = new AtomicInteger(1); 51 private final AtomicInteger nextMessageId = new AtomicInteger(1);
@@ -221,7 +221,7 @@ final class MqttClientImpl implements MqttClient { @@ -221,7 +221,7 @@ final class MqttClientImpl implements MqttClient {
221 221
222 /** 222 /**
223 * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler 223 * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
224 - * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed 224 + * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
225 * 225 *
226 * @param topic The topic filter to subscribe to 226 * @param topic The topic filter to subscribe to
227 * @param handler The handler to invoke when we receive a message 227 * @param handler The handler to invoke when we receive a message
@@ -234,7 +234,7 @@ final class MqttClientImpl implements MqttClient { @@ -234,7 +234,7 @@ final class MqttClientImpl implements MqttClient {
234 234
235 /** 235 /**
236 * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler 236 * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
237 - * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed 237 + * This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
238 * 238 *
239 * @param topic The topic filter to subscribe to 239 * @param topic The topic filter to subscribe to
240 * @param handler The handler to invoke when we receive a message 240 * @param handler The handler to invoke when we receive a message
@@ -247,7 +247,7 @@ final class MqttClientImpl implements MqttClient { @@ -247,7 +247,7 @@ final class MqttClientImpl implements MqttClient {
247 } 247 }
248 248
249 /** 249 /**
250 - * Remove the subscribtion for the given topic and handler 250 + * Remove the subscription for the given topic and handler
251 * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)} 251 * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
252 * 252 *
253 * @param topic The topic to unsubscribe for 253 * @param topic The topic to unsubscribe for
@@ -257,8 +257,8 @@ final class MqttClientImpl implements MqttClient { @@ -257,8 +257,8 @@ final class MqttClientImpl implements MqttClient {
257 @Override 257 @Override
258 public Future<Void> off(String topic, MqttHandler handler) { 258 public Future<Void> off(String topic, MqttHandler handler) {
259 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); 259 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
260 - for (MqttSubscription subscribtion : this.handlerToSubscribtion.get(handler)) {  
261 - this.subscriptions.remove(topic, subscribtion); 260 + for (MqttSubscription subscription : this.handlerToSubscribtion.get(handler)) {
  261 + this.subscriptions.remove(topic, subscription);
262 } 262 }
263 this.handlerToSubscribtion.removeAll(handler); 263 this.handlerToSubscribtion.removeAll(handler);
264 this.checkSubscribtions(topic, future); 264 this.checkSubscribtions(topic, future);
@@ -266,7 +266,7 @@ final class MqttClientImpl implements MqttClient { @@ -266,7 +266,7 @@ final class MqttClientImpl implements MqttClient {
266 } 266 }
267 267
268 /** 268 /**
269 - * Remove all subscribtions for the given topic. 269 + * Remove all subscriptions for the given topic.
270 * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)} 270 * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
271 * 271 *
272 * @param topic The topic to unsubscribe for 272 * @param topic The topic to unsubscribe for
@@ -275,12 +275,12 @@ final class MqttClientImpl implements MqttClient { @@ -275,12 +275,12 @@ final class MqttClientImpl implements MqttClient {
275 @Override 275 @Override
276 public Future<Void> off(String topic) { 276 public Future<Void> off(String topic) {
277 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); 277 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
278 - ImmutableSet<MqttSubscription> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));  
279 - for (MqttSubscription subscribtion : subscribtions) {  
280 - for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) { 278 + ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
  279 + for (MqttSubscription subscription : subscriptions) {
  280 + for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) {
281 this.subscriptions.remove(topic, handSub); 281 this.subscriptions.remove(topic, handSub);
282 } 282 }
283 - this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion); 283 + this.handlerToSubscribtion.remove(subscription.getHandler(), subscription);
284 } 284 }
285 this.checkSubscribtions(topic, future); 285 this.checkSubscribtions(topic, future);
286 return future; 286 return future;
@@ -411,16 +411,16 @@ final class MqttClientImpl implements MqttClient { @@ -411,16 +411,16 @@ final class MqttClientImpl implements MqttClient {
411 411
412 private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) { 412 private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
413 if (this.pendingSubscribeTopics.contains(topic)) { 413 if (this.pendingSubscribeTopics.contains(topic)) {
414 - Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();  
415 - if (subscribtionEntry.isPresent()) {  
416 - subscribtionEntry.get().getValue().addHandler(handler, once);  
417 - return subscribtionEntry.get().getValue().getFuture(); 414 + Optional<Map.Entry<Integer, MqttPendingSubscription>> subscriptionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
  415 + if (subscriptionEntry.isPresent()) {
  416 + subscriptionEntry.get().getValue().addHandler(handler, once);
  417 + return subscriptionEntry.get().getValue().getFuture();
418 } 418 }
419 } 419 }
420 if (this.serverSubscriptions.contains(topic)) { 420 if (this.serverSubscriptions.contains(topic)) {
421 - MqttSubscription subscribtion = new MqttSubscription(topic, handler, once);  
422 - this.subscriptions.put(topic, subscribtion);  
423 - this.handlerToSubscribtion.put(handler, subscribtion); 421 + MqttSubscription subscription = new MqttSubscription(topic, handler, once);
  422 + this.subscriptions.put(topic, subscription);
  423 + this.handlerToSubscribtion.put(handler, subscription);
424 return this.channel.newSucceededFuture(); 424 return this.channel.newSucceededFuture();
425 } 425 }
426 426
@@ -431,13 +431,13 @@ final class MqttClientImpl implements MqttClient { @@ -431,13 +431,13 @@ final class MqttClientImpl implements MqttClient {
431 MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription)); 431 MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
432 MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload); 432 MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
433 433
434 - final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);  
435 - pendingSubscribtion.addHandler(handler, once);  
436 - this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscribtion); 434 + final MqttPendingSubscription pendingSubscription = new MqttPendingSubscription(future, topic, message);
  435 + pendingSubscription.addHandler(handler, once);
  436 + this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription);
437 this.pendingSubscribeTopics.add(topic); 437 this.pendingSubscribeTopics.add(topic);
438 - pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened 438 + pendingSubscription.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
439 439
440 - pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket); 440 + pendingSubscription.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
441 441
442 return future; 442 return future;
443 } 443 }
@@ -449,9 +449,9 @@ final class MqttClientImpl implements MqttClient { @@ -449,9 +449,9 @@ final class MqttClientImpl implements MqttClient {
449 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); 449 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
450 MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload); 450 MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
451 451
452 - MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message);  
453 - this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion);  
454 - pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket); 452 + MqttPendingUnsubscription pendingUnsubscription = new MqttPendingUnsubscription(promise, topic, message);
  453 + this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription);
  454 + pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
455 455
456 this.sendAndFlushPacket(message); 456 this.sendAndFlushPacket(message);
457 } else { 457 } else {
@@ -459,7 +459,7 @@ final class MqttClientImpl implements MqttClient { @@ -459,7 +459,7 @@ final class MqttClientImpl implements MqttClient {
459 } 459 }
460 } 460 }
461 461
462 - IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscriptions() { 462 + IntObjectHashMap<MqttPendingSubscription> getPendingSubscriptions() {
463 return pendingSubscriptions; 463 return pendingSubscriptions;
464 } 464 }
465 465
@@ -479,7 +479,7 @@ final class MqttClientImpl implements MqttClient { @@ -479,7 +479,7 @@ final class MqttClientImpl implements MqttClient {
479 return serverSubscriptions; 479 return serverSubscriptions;
480 } 480 }
481 481
482 - IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() { 482 + IntObjectHashMap<MqttPendingUnsubscription> getPendingServerUnsubscribes() {
483 return pendingServerUnsubscribes; 483 return pendingServerUnsubscribes;
484 } 484 }
485 485
netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscription.java renamed from netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingSubscribtion.java
@@ -23,7 +23,7 @@ import java.util.HashSet; @@ -23,7 +23,7 @@ import java.util.HashSet;
23 import java.util.Set; 23 import java.util.Set;
24 import java.util.function.Consumer; 24 import java.util.function.Consumer;
25 25
26 -final class MqttPendingSubscribtion { 26 +final class MqttPendingSubscription {
27 27
28 private final Promise<Void> future; 28 private final Promise<Void> future;
29 private final String topic; 29 private final String topic;
@@ -34,7 +34,7 @@ final class MqttPendingSubscribtion { @@ -34,7 +34,7 @@ final class MqttPendingSubscribtion {
34 34
35 private boolean sent = false; 35 private boolean sent = false;
36 36
37 - MqttPendingSubscribtion(Promise<Void> future, String topic, MqttSubscribeMessage message) { 37 + MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message) {
38 this.future = future; 38 this.future = future;
39 this.topic = topic; 39 this.topic = topic;
40 this.subscribeMessage = message; 40 this.subscribeMessage = message;
netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscription.java renamed from netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttPendingUnsubscribtion.java
@@ -21,14 +21,14 @@ import io.netty.util.concurrent.Promise; @@ -21,14 +21,14 @@ import io.netty.util.concurrent.Promise;
21 21
22 import java.util.function.Consumer; 22 import java.util.function.Consumer;
23 23
24 -final class MqttPendingUnsubscribtion { 24 +final class MqttPendingUnsubscription {
25 25
26 private final Promise<Void> future; 26 private final Promise<Void> future;
27 private final String topic; 27 private final String topic;
28 28
29 private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>(); 29 private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
30 30
31 - MqttPendingUnsubscribtion(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) { 31 + MqttPendingUnsubscription(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) {
32 this.future = future; 32 this.future = future;
33 this.topic = topic; 33 this.topic = topic;
34 34