Commit 17adf8bd8b330527056f05cdeec7c0015eec60e5

Authored by Igor Kulikov
1 parent fd905987

Improved MQTT client and added default handler to handle reconnect subscriptions

@@ -98,33 +98,25 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -98,33 +98,25 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
98 } 98 }
99 99
100 private void invokeHandlersForIncomingPublish(MqttPublishMessage message) { 100 private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
101 - for (MqttSubscribtion subscribtion : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {  
102 - if (subscribtion.matches(message.variableHeader().topicName())) {  
103 - if (subscribtion.isOnce() && subscribtion.isCalled()) { 101 + boolean handlerInvoked = false;
  102 + for (MqttSubscription subscription : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
  103 + if (subscription.matches(message.variableHeader().topicName())) {
  104 + if (subscription.isOnce() && subscription.isCalled()) {
104 continue; 105 continue;
105 } 106 }
106 message.payload().markReaderIndex(); 107 message.payload().markReaderIndex();
107 - subscribtion.setCalled(true);  
108 - subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());  
109 - if (subscribtion.isOnce()) {  
110 - this.client.off(subscribtion.getTopic(), subscribtion.getHandler()); 108 + subscription.setCalled(true);
  109 + subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
  110 + if (subscription.isOnce()) {
  111 + this.client.off(subscription.getTopic(), subscription.getHandler());
111 } 112 }
112 message.payload().resetReaderIndex(); 113 message.payload().resetReaderIndex();
  114 + handlerInvoked = true;
113 } 115 }
114 } 116 }
115 - /*Set<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.client.getSubscriptions().get(message.variableHeader().topicName()));  
116 - for (MqttSubscribtion subscribtion : subscribtions) {  
117 - if(subscribtion.isOnce() && subscribtion.isCalled()){  
118 - continue;  
119 - }  
120 - message.payload().markReaderIndex();  
121 - subscribtion.setCalled(true);  
122 - subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());  
123 - if(subscribtion.isOnce()){  
124 - this.client.off(subscribtion.getTopic(), subscribtion.getHandler());  
125 - }  
126 - message.payload().resetReaderIndex();  
127 - }*/ 117 + if (!handlerInvoked && client.getDefaultHandler() != null) {
  118 + client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
  119 + }
128 message.payload().release(); 120 message.payload().release();
129 } 121 }
130 122
@@ -133,7 +125,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -133,7 +125,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
133 case CONNECTION_ACCEPTED: 125 case CONNECTION_ACCEPTED:
134 this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture())); 126 this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture()));
135 127
136 - this.client.getPendingSubscribtions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> { 128 + this.client.getPendingSubscriptions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> {
137 channel.write(e.getValue().getSubscribeMessage()); 129 channel.write(e.getValue().getSubscribeMessage());
138 e.getValue().setSent(true); 130 e.getValue().setSent(true);
139 }); 131 });
@@ -148,6 +140,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -148,6 +140,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
148 } 140 }
149 }); 141 });
150 channel.flush(); 142 channel.flush();
  143 + if (this.client.isReconnect()) {
  144 + this.client.onSuccessfulReconnect();
  145 + }
151 break; 146 break;
152 147
153 case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: 148 case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
@@ -163,19 +158,19 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -163,19 +158,19 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
163 } 158 }
164 159
165 private void handleSubAck(MqttSubAckMessage message) { 160 private void handleSubAck(MqttSubAckMessage message) {
166 - MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscribtions().remove(message.variableHeader().messageId()); 161 + MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
167 if (pendingSubscription == null) { 162 if (pendingSubscription == null) {
168 return; 163 return;
169 } 164 }
170 pendingSubscription.onSubackReceived(); 165 pendingSubscription.onSubackReceived();
171 for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) { 166 for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
172 - MqttSubscribtion subscribtion = new MqttSubscribtion(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce()); 167 + MqttSubscription subscribtion = new MqttSubscription(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
173 this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion); 168 this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
174 this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion); 169 this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
175 } 170 }
176 this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic()); 171 this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
177 172
178 - this.client.getServerSubscribtions().add(pendingSubscription.getTopic()); 173 + this.client.getServerSubscriptions().add(pendingSubscription.getTopic());
179 174
180 if (!pendingSubscription.getFuture().isDone()) { 175 if (!pendingSubscription.getFuture().isDone()) {
181 pendingSubscription.getFuture().setSuccess(null); 176 pendingSubscription.getFuture().setSuccess(null);
@@ -220,7 +215,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> @@ -220,7 +215,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
220 return; 215 return;
221 } 216 }
222 unsubscribtion.onUnsubackReceived(); 217 unsubscribtion.onUnsubackReceived();
223 - this.client.getServerSubscribtions().remove(unsubscribtion.getTopic()); 218 + this.client.getServerSubscriptions().remove(unsubscribtion.getTopic());
224 unsubscribtion.getFuture().setSuccess(null); 219 unsubscribtion.getFuture().setSuccess(null);
225 this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId()); 220 this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
226 } 221 }
@@ -172,24 +172,18 @@ public interface MqttClient { @@ -172,24 +172,18 @@ public interface MqttClient {
172 */ 172 */
173 MqttClientConfig getClientConfig(); 173 MqttClientConfig getClientConfig();
174 174
175 - /**  
176 - * Construct the MqttClientImpl with default config  
177 - */  
178 - static MqttClient create(){  
179 - return new MqttClientImpl();  
180 - }  
181 175
182 /** 176 /**
183 * Construct the MqttClientImpl with additional config. 177 * Construct the MqttClientImpl with additional config.
184 * This config can also be changed using the {@link #getClientConfig()} function 178 * This config can also be changed using the {@link #getClientConfig()} function
185 * 179 *
186 * @param config The config object to use while looking for settings 180 * @param config The config object to use while looking for settings
  181 + * @param defaultHandler The handler for incoming messages that do not match any topic subscriptions
187 */ 182 */
188 - static MqttClient create(MqttClientConfig config){  
189 - return new MqttClientImpl(config); 183 + static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler){
  184 + return new MqttClientImpl(config, defaultHandler);
190 } 185 }
191 186
192 -  
193 /** 187 /**
194 * Send disconnect and close channel 188 * Send disconnect and close channel
195 * 189 *
@@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
15 */ 15 */
16 package org.thingsboard.mqtt; 16 package org.thingsboard.mqtt;
17 17
  18 +import io.netty.channel.ChannelId;
  19 +
18 /** 20 /**
19 * Created by Valerii Sosliuk on 12/30/2017. 21 * Created by Valerii Sosliuk on 12/30/2017.
20 */ 22 */
@@ -25,5 +27,11 @@ public interface MqttClientCallback { @@ -25,5 +27,11 @@ public interface MqttClientCallback {
25 * 27 *
26 * @param cause the reason behind the loss of connection. 28 * @param cause the reason behind the loss of connection.
27 */ 29 */
28 - public void connectionLost(Throwable cause); 30 + void connectionLost(Throwable cause);
  31 +
  32 + /**
  33 + * This method is called when the connection to the server is recovered.
  34 + *
  35 + */
  36 + void onSuccessfulReconnect();
29 } 37 }
@@ -40,23 +40,26 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -40,23 +40,26 @@ import java.util.concurrent.atomic.AtomicInteger;
40 @SuppressWarnings({"WeakerAccess", "unused"}) 40 @SuppressWarnings({"WeakerAccess", "unused"})
41 final class MqttClientImpl implements MqttClient { 41 final class MqttClientImpl implements MqttClient {
42 42
43 - private final Set<String> serverSubscribtions = new HashSet<>(); 43 + private final Set<String> serverSubscriptions = new HashSet<>();
44 private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>(); 44 private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
45 private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>(); 45 private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
46 private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>(); 46 private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
47 - private final HashMultimap<String, MqttSubscribtion> subscriptions = HashMultimap.create();  
48 - private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscribtions = new IntObjectHashMap<>(); 47 + private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
  48 + private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscriptions = new IntObjectHashMap<>();
49 private final Set<String> pendingSubscribeTopics = new HashSet<>(); 49 private final Set<String> pendingSubscribeTopics = new HashSet<>();
50 - private final HashMultimap<MqttHandler, MqttSubscribtion> handlerToSubscribtion = HashMultimap.create(); 50 + private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion = HashMultimap.create();
51 private final AtomicInteger nextMessageId = new AtomicInteger(1); 51 private final AtomicInteger nextMessageId = new AtomicInteger(1);
52 52
53 private final MqttClientConfig clientConfig; 53 private final MqttClientConfig clientConfig;
54 54
  55 + private final MqttHandler defaultHandler;
  56 +
55 private EventLoopGroup eventLoop; 57 private EventLoopGroup eventLoop;
56 58
57 - private Channel channel; 59 + private volatile Channel channel;
58 60
59 - private boolean disconnected = false; 61 + private volatile boolean disconnected = false;
  62 + private volatile boolean reconnect = false;
60 private String host; 63 private String host;
61 private int port; 64 private int port;
62 private MqttClientCallback callback; 65 private MqttClientCallback callback;
@@ -65,8 +68,9 @@ final class MqttClientImpl implements MqttClient { @@ -65,8 +68,9 @@ final class MqttClientImpl implements MqttClient {
65 /** 68 /**
66 * Construct the MqttClientImpl with default config 69 * Construct the MqttClientImpl with default config
67 */ 70 */
68 - public MqttClientImpl() { 71 + public MqttClientImpl(MqttHandler defaultHandler) {
69 this.clientConfig = new MqttClientConfig(); 72 this.clientConfig = new MqttClientConfig();
  73 + this.defaultHandler = defaultHandler;
70 } 74 }
71 75
72 /** 76 /**
@@ -75,8 +79,9 @@ final class MqttClientImpl implements MqttClient { @@ -75,8 +79,9 @@ final class MqttClientImpl implements MqttClient {
75 * 79 *
76 * @param clientConfig The config object to use while looking for settings 80 * @param clientConfig The config object to use while looking for settings
77 */ 81 */
78 - public MqttClientImpl(MqttClientConfig clientConfig) { 82 + public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler) {
79 this.clientConfig = clientConfig; 83 this.clientConfig = clientConfig;
  84 + this.defaultHandler = defaultHandler;
80 } 85 }
81 86
82 /** 87 /**
@@ -100,12 +105,15 @@ final class MqttClientImpl implements MqttClient { @@ -100,12 +105,15 @@ final class MqttClientImpl implements MqttClient {
100 */ 105 */
101 @Override 106 @Override
102 public Future<MqttConnectResult> connect(String host, int port) { 107 public Future<MqttConnectResult> connect(String host, int port) {
  108 + return connect(host, port, false);
  109 + }
  110 +
  111 + private Future<MqttConnectResult> connect(String host, int port, boolean reconnect) {
103 if (this.eventLoop == null) { 112 if (this.eventLoop == null) {
104 this.eventLoop = new NioEventLoopGroup(); 113 this.eventLoop = new NioEventLoopGroup();
105 } 114 }
106 this.host = host; 115 this.host = host;
107 this.port = port; 116 this.port = port;
108 -  
109 Promise<MqttConnectResult> connectFuture = new DefaultPromise<>(this.eventLoop.next()); 117 Promise<MqttConnectResult> connectFuture = new DefaultPromise<>(this.eventLoop.next());
110 Bootstrap bootstrap = new Bootstrap(); 118 Bootstrap bootstrap = new Bootstrap();
111 bootstrap.group(this.eventLoop); 119 bootstrap.group(this.eventLoop);
@@ -113,22 +121,47 @@ final class MqttClientImpl implements MqttClient { @@ -113,22 +121,47 @@ final class MqttClientImpl implements MqttClient {
113 bootstrap.remoteAddress(host, port); 121 bootstrap.remoteAddress(host, port);
114 bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext())); 122 bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext()));
115 ChannelFuture future = bootstrap.connect(); 123 ChannelFuture future = bootstrap.connect();
  124 +
116 future.addListener((ChannelFutureListener) f -> { 125 future.addListener((ChannelFutureListener) f -> {
117 if (f.isSuccess()) { 126 if (f.isSuccess()) {
118 MqttClientImpl.this.channel = f.channel(); 127 MqttClientImpl.this.channel = f.channel();
119 - } else if (clientConfig.isReconnect() && !disconnected) {  
120 - eventLoop.schedule((Runnable) () -> connect(host, port), 1L, TimeUnit.SECONDS); 128 + MqttClientImpl.this.channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> {
  129 + if (isConnected()) {
  130 + return;
  131 + }
  132 + ChannelClosedException e = new ChannelClosedException("Channel is closed!");
  133 + if (callback != null) {
  134 + callback.connectionLost(e);
  135 + }
  136 + pendingSubscriptions.clear();
  137 + serverSubscriptions.clear();
  138 + subscriptions.clear();
  139 + pendingServerUnsubscribes.clear();
  140 + qos2PendingIncomingPublishes.clear();
  141 + pendingPublishes.clear();
  142 + pendingSubscribeTopics.clear();
  143 + handlerToSubscribtion.clear();
  144 + scheduleConnectIfRequired(host, port, true);
  145 + });
  146 + } else {
  147 + scheduleConnectIfRequired(host, port, reconnect);
121 } 148 }
122 }); 149 });
123 return connectFuture; 150 return connectFuture;
124 } 151 }
125 152
  153 + private void scheduleConnectIfRequired(String host, int port, boolean reconnect) {
  154 + if (clientConfig.isReconnect() && !disconnected) {
  155 + if (reconnect) {
  156 + this.reconnect = true;
  157 + }
  158 + eventLoop.schedule((Runnable) () -> connect(host, port, reconnect), 1L, TimeUnit.SECONDS);
  159 + }
  160 + }
  161 +
126 @Override 162 @Override
127 public boolean isConnected() { 163 public boolean isConnected() {
128 - if (!disconnected) {  
129 - return channel == null ? false : channel.isActive();  
130 - };  
131 - return false; 164 + return !disconnected && channel != null && channel.isActive();
132 } 165 }
133 166
134 @Override 167 @Override
@@ -183,7 +216,7 @@ final class MqttClientImpl implements MqttClient { @@ -183,7 +216,7 @@ final class MqttClientImpl implements MqttClient {
183 */ 216 */
184 @Override 217 @Override
185 public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) { 218 public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) {
186 - return createSubscribtion(topic, handler, false, qos); 219 + return createSubscription(topic, handler, false, qos);
187 } 220 }
188 221
189 /** 222 /**
@@ -210,7 +243,7 @@ final class MqttClientImpl implements MqttClient { @@ -210,7 +243,7 @@ final class MqttClientImpl implements MqttClient {
210 */ 243 */
211 @Override 244 @Override
212 public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) { 245 public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) {
213 - return createSubscribtion(topic, handler, true, qos); 246 + return createSubscription(topic, handler, true, qos);
214 } 247 }
215 248
216 /** 249 /**
@@ -224,7 +257,7 @@ final class MqttClientImpl implements MqttClient { @@ -224,7 +257,7 @@ final class MqttClientImpl implements MqttClient {
224 @Override 257 @Override
225 public Future<Void> off(String topic, MqttHandler handler) { 258 public Future<Void> off(String topic, MqttHandler handler) {
226 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); 259 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
227 - for (MqttSubscribtion subscribtion : this.handlerToSubscribtion.get(handler)) { 260 + for (MqttSubscription subscribtion : this.handlerToSubscribtion.get(handler)) {
228 this.subscriptions.remove(topic, subscribtion); 261 this.subscriptions.remove(topic, subscribtion);
229 } 262 }
230 this.handlerToSubscribtion.removeAll(handler); 263 this.handlerToSubscribtion.removeAll(handler);
@@ -242,9 +275,9 @@ final class MqttClientImpl implements MqttClient { @@ -242,9 +275,9 @@ final class MqttClientImpl implements MqttClient {
242 @Override 275 @Override
243 public Future<Void> off(String topic) { 276 public Future<Void> off(String topic) {
244 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next()); 277 Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
245 - ImmutableSet<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));  
246 - for (MqttSubscribtion subscribtion : subscribtions) {  
247 - for (MqttSubscribtion handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) { 278 + ImmutableSet<MqttSubscription> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
  279 + for (MqttSubscription subscribtion : subscribtions) {
  280 + for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
248 this.subscriptions.remove(topic, handSub); 281 this.subscriptions.remove(topic, handSub);
249 } 282 }
250 this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion); 283 this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
@@ -310,7 +343,7 @@ final class MqttClientImpl implements MqttClient { @@ -310,7 +343,7 @@ final class MqttClientImpl implements MqttClient {
310 ChannelFuture channelFuture = this.sendAndFlushPacket(message); 343 ChannelFuture channelFuture = this.sendAndFlushPacket(message);
311 344
312 if (channelFuture != null) { 345 if (channelFuture != null) {
313 - pendingPublish.setSent(channelFuture != null); 346 + pendingPublish.setSent(true);
314 if (channelFuture.cause() != null) { 347 if (channelFuture.cause() != null) {
315 future.setFailure(channelFuture.cause()); 348 future.setFailure(channelFuture.cause());
316 return future; 349 return future;
@@ -352,6 +385,15 @@ final class MqttClientImpl implements MqttClient { @@ -352,6 +385,15 @@ final class MqttClientImpl implements MqttClient {
352 385
353 ///////////////////////////////////////////// PRIVATE API ///////////////////////////////////////////// 386 ///////////////////////////////////////////// PRIVATE API /////////////////////////////////////////////
354 387
  388 + public boolean isReconnect() {
  389 + return reconnect;
  390 + }
  391 +
  392 + public void onSuccessfulReconnect() {
  393 + callback.onSuccessfulReconnect();
  394 + }
  395 +
  396 +
355 ChannelFuture sendAndFlushPacket(Object message) { 397 ChannelFuture sendAndFlushPacket(Object message) {
356 if (this.channel == null) { 398 if (this.channel == null) {
357 return null; 399 return null;
@@ -359,11 +401,7 @@ final class MqttClientImpl implements MqttClient { @@ -359,11 +401,7 @@ final class MqttClientImpl implements MqttClient {
359 if (this.channel.isActive()) { 401 if (this.channel.isActive()) {
360 return this.channel.writeAndFlush(message); 402 return this.channel.writeAndFlush(message);
361 } 403 }
362 - ChannelClosedException e = new ChannelClosedException("Channel is closed");  
363 - if (callback != null) {  
364 - callback.connectionLost(e);  
365 - }  
366 - return this.channel.newFailedFuture(e); 404 + return this.channel.newFailedFuture(new ChannelClosedException("Channel is closed!"));
367 } 405 }
368 406
369 private MqttMessageIdVariableHeader getNewMessageId() { 407 private MqttMessageIdVariableHeader getNewMessageId() {
@@ -371,16 +409,16 @@ final class MqttClientImpl implements MqttClient { @@ -371,16 +409,16 @@ final class MqttClientImpl implements MqttClient {
371 return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement()); 409 return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement());
372 } 410 }
373 411
374 - private Future<Void> createSubscribtion(String topic, MqttHandler handler, boolean once, MqttQoS qos) { 412 + private Future<Void> createSubscription(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
375 if (this.pendingSubscribeTopics.contains(topic)) { 413 if (this.pendingSubscribeTopics.contains(topic)) {
376 - Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscribtions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny(); 414 + Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscriptions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
377 if (subscribtionEntry.isPresent()) { 415 if (subscribtionEntry.isPresent()) {
378 subscribtionEntry.get().getValue().addHandler(handler, once); 416 subscribtionEntry.get().getValue().addHandler(handler, once);
379 return subscribtionEntry.get().getValue().getFuture(); 417 return subscribtionEntry.get().getValue().getFuture();
380 } 418 }
381 } 419 }
382 - if (this.serverSubscribtions.contains(topic)) {  
383 - MqttSubscribtion subscribtion = new MqttSubscribtion(topic, handler, once); 420 + if (this.serverSubscriptions.contains(topic)) {
  421 + MqttSubscription subscribtion = new MqttSubscription(topic, handler, once);
384 this.subscriptions.put(topic, subscribtion); 422 this.subscriptions.put(topic, subscribtion);
385 this.handlerToSubscribtion.put(handler, subscribtion); 423 this.handlerToSubscribtion.put(handler, subscribtion);
386 return this.channel.newSucceededFuture(); 424 return this.channel.newSucceededFuture();
@@ -395,7 +433,7 @@ final class MqttClientImpl implements MqttClient { @@ -395,7 +433,7 @@ final class MqttClientImpl implements MqttClient {
395 433
396 final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message); 434 final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
397 pendingSubscribtion.addHandler(handler, once); 435 pendingSubscribtion.addHandler(handler, once);
398 - this.pendingSubscribtions.put(variableHeader.messageId(), pendingSubscribtion); 436 + this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscribtion);
399 this.pendingSubscribeTopics.add(topic); 437 this.pendingSubscribeTopics.add(topic);
400 pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened 438 pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
401 439
@@ -405,7 +443,7 @@ final class MqttClientImpl implements MqttClient { @@ -405,7 +443,7 @@ final class MqttClientImpl implements MqttClient {
405 } 443 }
406 444
407 private void checkSubscribtions(String topic, Promise<Void> promise) { 445 private void checkSubscribtions(String topic, Promise<Void> promise) {
408 - if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscribtions.contains(topic)) { 446 + if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscriptions.contains(topic)) {
409 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); 447 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
410 MqttMessageIdVariableHeader variableHeader = getNewMessageId(); 448 MqttMessageIdVariableHeader variableHeader = getNewMessageId();
411 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic)); 449 MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
@@ -421,11 +459,11 @@ final class MqttClientImpl implements MqttClient { @@ -421,11 +459,11 @@ final class MqttClientImpl implements MqttClient {
421 } 459 }
422 } 460 }
423 461
424 - IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscribtions() {  
425 - return pendingSubscribtions; 462 + IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscriptions() {
  463 + return pendingSubscriptions;
426 } 464 }
427 465
428 - HashMultimap<String, MqttSubscribtion> getSubscriptions() { 466 + HashMultimap<String, MqttSubscription> getSubscriptions() {
429 return subscriptions; 467 return subscriptions;
430 } 468 }
431 469
@@ -433,12 +471,12 @@ final class MqttClientImpl implements MqttClient { @@ -433,12 +471,12 @@ final class MqttClientImpl implements MqttClient {
433 return pendingSubscribeTopics; 471 return pendingSubscribeTopics;
434 } 472 }
435 473
436 - HashMultimap<MqttHandler, MqttSubscribtion> getHandlerToSubscribtion() { 474 + HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscribtion() {
437 return handlerToSubscribtion; 475 return handlerToSubscribtion;
438 } 476 }
439 477
440 - Set<String> getServerSubscribtions() {  
441 - return serverSubscribtions; 478 + Set<String> getServerSubscriptions() {
  479 + return serverSubscriptions;
442 } 480 }
443 481
444 IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() { 482 IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
@@ -481,4 +519,9 @@ final class MqttClientImpl implements MqttClient { @@ -481,4 +519,9 @@ final class MqttClientImpl implements MqttClient {
481 ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture)); 519 ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture));
482 } 520 }
483 } 521 }
  522 +
  523 + MqttHandler getDefaultHandler() {
  524 + return defaultHandler;
  525 + }
  526 +
484 } 527 }
netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscription.java renamed from netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttSubscribtion.java
@@ -17,7 +17,7 @@ package org.thingsboard.mqtt; @@ -17,7 +17,7 @@ package org.thingsboard.mqtt;
17 17
18 import java.util.regex.Pattern; 18 import java.util.regex.Pattern;
19 19
20 -final class MqttSubscribtion { 20 +final class MqttSubscription {
21 21
22 private final String topic; 22 private final String topic;
23 private final Pattern topicRegex; 23 private final Pattern topicRegex;
@@ -27,7 +27,7 @@ final class MqttSubscribtion { @@ -27,7 +27,7 @@ final class MqttSubscribtion {
27 27
28 private boolean called; 28 private boolean called;
29 29
30 - MqttSubscribtion(String topic, MqttHandler handler, boolean once) { 30 + MqttSubscription(String topic, MqttHandler handler, boolean once) {
31 if(topic == null){ 31 if(topic == null){
32 throw new NullPointerException("topic"); 32 throw new NullPointerException("topic");
33 } 33 }
@@ -65,7 +65,7 @@ final class MqttSubscribtion { @@ -65,7 +65,7 @@ final class MqttSubscribtion {
65 if (this == o) return true; 65 if (this == o) return true;
66 if (o == null || getClass() != o.getClass()) return false; 66 if (o == null || getClass() != o.getClass()) return false;
67 67
68 - MqttSubscribtion that = (MqttSubscribtion) o; 68 + MqttSubscription that = (MqttSubscription) o;
69 69
70 return once == that.once && topic.equals(that.topic) && handler.equals(that.handler); 70 return once == that.once && topic.equals(that.topic) && handler.equals(that.handler);
71 } 71 }
@@ -113,7 +113,7 @@ public class TbMqttNode implements TbNode { @@ -113,7 +113,7 @@ public class TbMqttNode implements TbNode {
113 } 113 }
114 config.setCleanSession(this.config.isCleanSession()); 114 config.setCleanSession(this.config.isCleanSession());
115 this.config.getCredentials().configure(config); 115 this.config.getCredentials().configure(config);
116 - MqttClient client = MqttClient.create(config); 116 + MqttClient client = MqttClient.create(config, null);
117 client.setEventLoop(this.eventLoopGroup); 117 client.setEventLoop(this.eventLoopGroup);
118 Future<MqttConnectResult> connectFuture = client.connect(this.config.getHost(), this.config.getPort()); 118 Future<MqttConnectResult> connectFuture = client.connect(this.config.getHost(), this.config.getPort());
119 MqttConnectResult result; 119 MqttConnectResult result;