Commit 0ea799d8e33e1d91a4f3fcf17ea034d82fe10ea0

Authored by Igor Kulikov
1 parent 829c9a28

Add Netty MQTT Client module.

Showing 25 changed files with 2083 additions and 17 deletions
  1 +.idea/
  2 +*.ipr
  3 +*.iws
  4 +*.ids
  5 +*.iml
  6 +logs
  7 +target
... ...
  1 +<!--
  2 +
  3 + Copyright © 2016-2018 The Thingsboard Authors
  4 +
  5 + Licensed under the Apache License, Version 2.0 (the "License");
  6 + you may not use this file except in compliance with the License.
  7 + You may obtain a copy of the License at
  8 +
  9 + http://www.apache.org/licenses/LICENSE-2.0
  10 +
  11 + Unless required by applicable law or agreed to in writing, software
  12 + distributed under the License is distributed on an "AS IS" BASIS,
  13 + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 + See the License for the specific language governing permissions and
  15 + limitations under the License.
  16 +
  17 +-->
  18 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  19 + <modelVersion>4.0.0</modelVersion>
  20 + <parent>
  21 + <groupId>org.thingsboard</groupId>
  22 + <version>1.4.1-SNAPSHOT</version>
  23 + <artifactId>thingsboard</artifactId>
  24 + </parent>
  25 + <groupId>org.thingsboard</groupId>
  26 + <artifactId>netty-mqtt</artifactId>
  27 + <version>1.4.1-SNAPSHOT</version>
  28 + <packaging>jar</packaging>
  29 +
  30 + <name>Netty MQTT Client</name>
  31 + <url>https://thingsboard.io</url>
  32 +
  33 + <properties>
  34 + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  35 + <main.dir>${basedir}/..</main.dir>
  36 + </properties>
  37 +
  38 + <dependencies>
  39 + <dependency>
  40 + <groupId>io.netty</groupId>
  41 + <artifactId>netty-codec-mqtt</artifactId>
  42 + </dependency>
  43 + <dependency>
  44 + <groupId>io.netty</groupId>
  45 + <artifactId>netty-handler</artifactId>
  46 + </dependency>
  47 + <dependency>
  48 + <groupId>com.google.code.findbugs</groupId>
  49 + <artifactId>jsr305</artifactId>
  50 + <version>3.0.1</version>
  51 + <optional>true</optional>
  52 + </dependency>
  53 + <dependency>
  54 + <groupId>com.google.guava</groupId>
  55 + <artifactId>guava</artifactId>
  56 + </dependency>
  57 + </dependencies>
  58 +
  59 + <distributionManagement>
  60 + <repository>
  61 + <id>jk-5-maven</id>
  62 + <name>jk-5's maven server</name>
  63 + <url>sftp://10.2.1.2/opt/maven</url>
  64 + </repository>
  65 + </distributionManagement>
  66 +
  67 + <build>
  68 + <extensions>
  69 + <extension>
  70 + <groupId>org.apache.maven.wagon</groupId>
  71 + <artifactId>wagon-ssh</artifactId>
  72 + <version>2.6</version>
  73 + </extension>
  74 + </extensions>
  75 + <plugins>
  76 + <plugin>
  77 + <groupId>org.apache.maven.plugins</groupId>
  78 + <artifactId>maven-compiler-plugin</artifactId>
  79 + <version>3.1</version>
  80 + <configuration>
  81 + <source>1.8</source>
  82 + <target>1.8</target>
  83 + </configuration>
  84 + </plugin>
  85 + <plugin>
  86 + <groupId>org.apache.maven.plugins</groupId>
  87 + <artifactId>maven-jar-plugin</artifactId>
  88 + <version>2.4</version>
  89 + <configuration>
  90 + <archive>
  91 + <manifest>
  92 + <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
  93 + </manifest>
  94 + </archive>
  95 + </configuration>
  96 + </plugin>
  97 + </plugins>
  98 + </build>
  99 +</project>
\ No newline at end of file
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +/**
  19 + * Created by Valerii Sosliuk on 12/26/2017.
  20 + */
  21 +public class ChannelClosedException extends RuntimeException {
  22 +
  23 + private static final long serialVersionUID = 6266638352424706909L;
  24 +
  25 + public ChannelClosedException() {
  26 + }
  27 +
  28 + public ChannelClosedException(String message) {
  29 + super(message);
  30 + }
  31 +
  32 + public ChannelClosedException(String message, Throwable cause) {
  33 + super(message, cause);
  34 + }
  35 +
  36 + public ChannelClosedException(Throwable cause) {
  37 + super(cause);
  38 + }
  39 +
  40 + public ChannelClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
  41 + super(message, cause, enableSuppression, writableStackTrace);
  42 + }
  43 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import com.google.common.collect.ImmutableSet;
  19 +import io.netty.channel.Channel;
  20 +import io.netty.channel.ChannelHandlerContext;
  21 +import io.netty.channel.SimpleChannelInboundHandler;
  22 +import io.netty.handler.codec.mqtt.*;
  23 +import io.netty.util.CharsetUtil;
  24 +import io.netty.util.concurrent.Promise;
  25 +
  26 +final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
  27 +
  28 + private final MqttClientImpl client;
  29 + private final Promise<MqttConnectResult> connectFuture;
  30 +
  31 + MqttChannelHandler(MqttClientImpl client, Promise<MqttConnectResult> connectFuture) {
  32 + this.client = client;
  33 + this.connectFuture = connectFuture;
  34 + }
  35 +
  36 + @Override
  37 + protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  38 + switch (msg.fixedHeader().messageType()) {
  39 + case CONNACK:
  40 + handleConack(ctx.channel(), (MqttConnAckMessage) msg);
  41 + break;
  42 + case SUBACK:
  43 + handleSubAck((MqttSubAckMessage) msg);
  44 + break;
  45 + case PUBLISH:
  46 + handlePublish(ctx.channel(), (MqttPublishMessage) msg);
  47 + break;
  48 + case UNSUBACK:
  49 + handleUnsuback((MqttUnsubAckMessage) msg);
  50 + break;
  51 + case PUBACK:
  52 + handlePuback((MqttPubAckMessage) msg);
  53 + break;
  54 + case PUBREC:
  55 + handlePubrec(ctx.channel(), msg);
  56 + break;
  57 + case PUBREL:
  58 + handlePubrel(ctx.channel(), msg);
  59 + break;
  60 + case PUBCOMP:
  61 + handlePubcomp(msg);
  62 + break;
  63 + }
  64 + }
  65 +
  66 + @Override
  67 + public void channelActive(ChannelHandlerContext ctx) throws Exception {
  68 + super.channelActive(ctx);
  69 +
  70 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
  71 + MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
  72 + this.client.getClientConfig().getProtocolVersion().protocolName(), // Protocol Name
  73 + this.client.getClientConfig().getProtocolVersion().protocolLevel(), // Protocol Level
  74 + this.client.getClientConfig().getUsername() != null, // Has Username
  75 + this.client.getClientConfig().getPassword() != null, // Has Password
  76 + this.client.getClientConfig().getLastWill() != null // Will Retain
  77 + && this.client.getClientConfig().getLastWill().isRetain(),
  78 + this.client.getClientConfig().getLastWill() != null // Will QOS
  79 + ? this.client.getClientConfig().getLastWill().getQos().value()
  80 + : 0,
  81 + this.client.getClientConfig().getLastWill() != null, // Has Will
  82 + this.client.getClientConfig().isCleanSession(), // Clean Session
  83 + this.client.getClientConfig().getTimeoutSeconds() // Timeout
  84 + );
  85 + MqttConnectPayload payload = new MqttConnectPayload(
  86 + this.client.getClientConfig().getClientId(),
  87 + this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getTopic() : null,
  88 + this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getMessage().getBytes(CharsetUtil.UTF_8) : null,
  89 + this.client.getClientConfig().getUsername(),
  90 + this.client.getClientConfig().getPassword() != null ? this.client.getClientConfig().getPassword().getBytes(CharsetUtil.UTF_8) : null
  91 + );
  92 + ctx.channel().writeAndFlush(new MqttConnectMessage(fixedHeader, variableHeader, payload));
  93 + }
  94 +
  95 + @Override
  96 + public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  97 + super.channelInactive(ctx);
  98 + }
  99 +
  100 + private void invokeHandlersForIncomingPublish(MqttPublishMessage message) {
  101 + for (MqttSubscribtion subscribtion : ImmutableSet.copyOf(this.client.getSubscriptions().values())) {
  102 + if (subscribtion.matches(message.variableHeader().topicName())) {
  103 + if (subscribtion.isOnce() && subscribtion.isCalled()) {
  104 + continue;
  105 + }
  106 + message.payload().markReaderIndex();
  107 + subscribtion.setCalled(true);
  108 + subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
  109 + if (subscribtion.isOnce()) {
  110 + this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
  111 + }
  112 + message.payload().resetReaderIndex();
  113 + }
  114 + }
  115 + /*Set<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.client.getSubscriptions().get(message.variableHeader().topicName()));
  116 + for (MqttSubscribtion subscribtion : subscribtions) {
  117 + if(subscribtion.isOnce() && subscribtion.isCalled()){
  118 + continue;
  119 + }
  120 + message.payload().markReaderIndex();
  121 + subscribtion.setCalled(true);
  122 + subscribtion.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
  123 + if(subscribtion.isOnce()){
  124 + this.client.off(subscribtion.getTopic(), subscribtion.getHandler());
  125 + }
  126 + message.payload().resetReaderIndex();
  127 + }*/
  128 + message.payload().release();
  129 + }
  130 +
  131 + private void handleConack(Channel channel, MqttConnAckMessage message) {
  132 + switch (message.variableHeader().connectReturnCode()) {
  133 + case CONNECTION_ACCEPTED:
  134 + this.connectFuture.setSuccess(new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture()));
  135 +
  136 + this.client.getPendingSubscribtions().entrySet().stream().filter((e) -> !e.getValue().isSent()).forEach((e) -> {
  137 + channel.write(e.getValue().getSubscribeMessage());
  138 + e.getValue().setSent(true);
  139 + });
  140 +
  141 + this.client.getPendingPublishes().forEach((id, publish) -> {
  142 + if (publish.isSent()) return;
  143 + channel.write(publish.getMessage());
  144 + publish.setSent(true);
  145 + if (publish.getQos() == MqttQoS.AT_MOST_ONCE) {
  146 + publish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
  147 + this.client.getPendingPublishes().remove(publish.getMessageId());
  148 + }
  149 + });
  150 + channel.flush();
  151 + break;
  152 +
  153 + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD:
  154 + case CONNECTION_REFUSED_IDENTIFIER_REJECTED:
  155 + case CONNECTION_REFUSED_NOT_AUTHORIZED:
  156 + case CONNECTION_REFUSED_SERVER_UNAVAILABLE:
  157 + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION:
  158 + this.connectFuture.setSuccess(new MqttConnectResult(false, message.variableHeader().connectReturnCode(), channel.closeFuture()));
  159 + channel.close();
  160 + // Don't start reconnect logic here
  161 + break;
  162 + }
  163 + }
  164 +
  165 + private void handleSubAck(MqttSubAckMessage message) {
  166 + MqttPendingSubscribtion pendingSubscription = this.client.getPendingSubscribtions().remove(message.variableHeader().messageId());
  167 + if (pendingSubscription == null) {
  168 + return;
  169 + }
  170 + pendingSubscription.onSubackReceived();
  171 + for (MqttPendingSubscribtion.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
  172 + MqttSubscribtion subscribtion = new MqttSubscribtion(pendingSubscription.getTopic(), handler.getHandler(), handler.isOnce());
  173 + this.client.getSubscriptions().put(pendingSubscription.getTopic(), subscribtion);
  174 + this.client.getHandlerToSubscribtion().put(handler.getHandler(), subscribtion);
  175 + }
  176 + this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
  177 +
  178 + this.client.getServerSubscribtions().add(pendingSubscription.getTopic());
  179 +
  180 + if (!pendingSubscription.getFuture().isDone()) {
  181 + pendingSubscription.getFuture().setSuccess(null);
  182 + }
  183 + }
  184 +
  185 + private void handlePublish(Channel channel, MqttPublishMessage message) {
  186 + switch (message.fixedHeader().qosLevel()) {
  187 + case AT_MOST_ONCE:
  188 + invokeHandlersForIncomingPublish(message);
  189 + break;
  190 +
  191 + case AT_LEAST_ONCE:
  192 + invokeHandlersForIncomingPublish(message);
  193 + if (message.variableHeader().messageId() != -1) {
  194 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
  195 + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
  196 + channel.writeAndFlush(new MqttPubAckMessage(fixedHeader, variableHeader));
  197 + }
  198 + break;
  199 +
  200 + case EXACTLY_ONCE:
  201 + if (message.variableHeader().messageId() != -1) {
  202 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
  203 + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(message.variableHeader().messageId());
  204 + MqttMessage pubrecMessage = new MqttMessage(fixedHeader, variableHeader);
  205 +
  206 + MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message, pubrecMessage);
  207 + this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().messageId(), incomingQos2Publish);
  208 + message.payload().retain();
  209 + incomingQos2Publish.startPubrecRetransmitTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
  210 +
  211 + channel.writeAndFlush(pubrecMessage);
  212 + }
  213 + break;
  214 + }
  215 + }
  216 +
  217 + private void handleUnsuback(MqttUnsubAckMessage message) {
  218 + MqttPendingUnsubscribtion unsubscribtion = this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
  219 + if (unsubscribtion == null) {
  220 + return;
  221 + }
  222 + unsubscribtion.onUnsubackReceived();
  223 + this.client.getServerSubscribtions().remove(unsubscribtion.getTopic());
  224 + unsubscribtion.getFuture().setSuccess(null);
  225 + this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
  226 + }
  227 +
  228 + private void handlePuback(MqttPubAckMessage message) {
  229 + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(message.variableHeader().messageId());
  230 + pendingPublish.getFuture().setSuccess(null);
  231 + pendingPublish.onPubackReceived();
  232 + this.client.getPendingPublishes().remove(message.variableHeader().messageId());
  233 + pendingPublish.getPayload().release();
  234 + }
  235 +
  236 + private void handlePubrec(Channel channel, MqttMessage message) {
  237 + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
  238 + pendingPublish.onPubackReceived();
  239 +
  240 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  241 + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
  242 + MqttMessage pubrelMessage = new MqttMessage(fixedHeader, variableHeader);
  243 + channel.writeAndFlush(pubrelMessage);
  244 +
  245 + pendingPublish.setPubrelMessage(pubrelMessage);
  246 + pendingPublish.startPubrelRetransmissionTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
  247 + }
  248 +
  249 + private void handlePubrel(Channel channel, MqttMessage message) {
  250 + if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader) message.variableHeader()).messageId())) {
  251 + MqttIncomingQos2Publish incomingQos2Publish = this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
  252 + this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
  253 + incomingQos2Publish.onPubrelReceived();
  254 + this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().messageId());
  255 + }
  256 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
  257 + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) message.variableHeader()).messageId());
  258 + channel.writeAndFlush(new MqttMessage(fixedHeader, variableHeader));
  259 + }
  260 +
  261 + private void handlePubcomp(MqttMessage message) {
  262 + MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
  263 + MqttPendingPublish pendingPublish = this.client.getPendingPublishes().get(variableHeader.messageId());
  264 + pendingPublish.getFuture().setSuccess(null);
  265 + this.client.getPendingPublishes().remove(variableHeader.messageId());
  266 + pendingPublish.getPayload().release();
  267 + pendingPublish.onPubcompReceived();
  268 + }
  269 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.buffer.ByteBuf;
  19 +import io.netty.channel.Channel;
  20 +import io.netty.channel.EventLoopGroup;
  21 +import io.netty.channel.nio.NioEventLoopGroup;
  22 +import io.netty.handler.codec.mqtt.MqttQoS;
  23 +import io.netty.util.concurrent.Future;
  24 +
  25 +public interface MqttClient {
  26 +
  27 + /**
  28 + * Connect to the specified hostname/ip. By default uses port 1883.
  29 + * If you want to change the port number, see {@link #connect(String, int)}
  30 + *
  31 + * @param host The ip address or host to connect to
  32 + * @return A future which will be completed when the connection is opened and we received an CONNACK
  33 + */
  34 + Future<MqttConnectResult> connect(String host);
  35 +
  36 + /**
  37 + * Connect to the specified hostname/ip using the specified port
  38 + *
  39 + * @param host The ip address or host to connect to
  40 + * @param port The tcp port to connect to
  41 + * @return A future which will be completed when the connection is opened and we received an CONNACK
  42 + */
  43 + Future<MqttConnectResult> connect(String host, int port);
  44 +
  45 + /**
  46 + *
  47 + * @return boolean value indicating if channel is active
  48 + */
  49 + boolean isConnected();
  50 +
  51 + /**
  52 + * Attempt reconnect to the host that was attempted with {@link #connect(String, int)} method before
  53 + *
  54 + * @return A future which will be completed when the connection is opened and we received an CONNACK
  55 + * @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
  56 + */
  57 + Future<MqttConnectResult> reconnect();
  58 +
  59 + /**
  60 + * Retrieve the netty {@link EventLoopGroup} we are using
  61 + * @return The netty {@link EventLoopGroup} we use for the connection
  62 + */
  63 + EventLoopGroup getEventLoop();
  64 +
  65 + /**
  66 + * By default we use the netty {@link NioEventLoopGroup}.
  67 + * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)}
  68 + * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)}
  69 + *
  70 + * @param eventLoop The new eventloop to use
  71 + */
  72 + void setEventLoop(EventLoopGroup eventLoop);
  73 +
  74 + /**
  75 + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  76 + *
  77 + * @param topic The topic filter to subscribe to
  78 + * @param handler The handler to invoke when we receive a message
  79 + * @return A future which will be completed when the server acknowledges our subscribe request
  80 + */
  81 + Future<Void> on(String topic, MqttHandler handler);
  82 +
  83 + /**
  84 + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  85 + *
  86 + * @param topic The topic filter to subscribe to
  87 + * @param handler The handler to invoke when we receive a message
  88 + * @param qos The qos to request to the server
  89 + * @return A future which will be completed when the server acknowledges our subscribe request
  90 + */
  91 + Future<Void> on(String topic, MqttHandler handler, MqttQoS qos);
  92 +
  93 + /**
  94 + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  95 + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
  96 + *
  97 + * @param topic The topic filter to subscribe to
  98 + * @param handler The handler to invoke when we receive a message
  99 + * @return A future which will be completed when the server acknowledges our subscribe request
  100 + */
  101 + Future<Void> once(String topic, MqttHandler handler);
  102 +
  103 + /**
  104 + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  105 + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
  106 + *
  107 + * @param topic The topic filter to subscribe to
  108 + * @param handler The handler to invoke when we receive a message
  109 + * @param qos The qos to request to the server
  110 + * @return A future which will be completed when the server acknowledges our subscribe request
  111 + */
  112 + Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
  113 +
  114 + /**
  115 + * Remove the subscribtion for the given topic and handler
  116 + * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
  117 + *
  118 + * @param topic The topic to unsubscribe for
  119 + * @param handler The handler to unsubscribe
  120 + * @return A future which will be completed when the server acknowledges our unsubscribe request
  121 + */
  122 + Future<Void> off(String topic, MqttHandler handler);
  123 +
  124 + /**
  125 + * Remove all subscribtions for the given topic.
  126 + * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
  127 + *
  128 + * @param topic The topic to unsubscribe for
  129 + * @return A future which will be completed when the server acknowledges our unsubscribe request
  130 + */
  131 + Future<Void> off(String topic);
  132 +
  133 + /**
  134 + * Publish a message to the given payload
  135 + * @param topic The topic to publish to
  136 + * @param payload The payload to send
  137 + * @return A future which will be completed when the message is sent out of the MqttClient
  138 + */
  139 + Future<Void> publish(String topic, ByteBuf payload);
  140 +
  141 + /**
  142 + * Publish a message to the given payload, using the given qos
  143 + * @param topic The topic to publish to
  144 + * @param payload The payload to send
  145 + * @param qos The qos to use while publishing
  146 + * @return A future which will be completed when the message is delivered to the server
  147 + */
  148 + Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos);
  149 +
  150 + /**
  151 + * Publish a message to the given payload, using optional retain
  152 + * @param topic The topic to publish to
  153 + * @param payload The payload to send
  154 + * @param retain true if you want to retain the message on the server, false otherwise
  155 + * @return A future which will be completed when the message is sent out of the MqttClient
  156 + */
  157 + Future<Void> publish(String topic, ByteBuf payload, boolean retain);
  158 +
  159 + /**
  160 + * Publish a message to the given payload, using the given qos and optional retain
  161 + * @param topic The topic to publish to
  162 + * @param payload The payload to send
  163 + * @param qos The qos to use while publishing
  164 + * @param retain true if you want to retain the message on the server, false otherwise
  165 + * @return A future which will be completed when the message is delivered to the server
  166 + */
  167 + Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain);
  168 +
  169 + /**
  170 + * Retrieve the MqttClient configuration
  171 + * @return The {@link MqttClientConfig} instance we use
  172 + */
  173 + MqttClientConfig getClientConfig();
  174 +
  175 + /**
  176 + * Construct the MqttClientImpl with default config
  177 + */
  178 + static MqttClient create(){
  179 + return new MqttClientImpl();
  180 + }
  181 +
  182 + /**
  183 + * Construct the MqttClientImpl with additional config.
  184 + * This config can also be changed using the {@link #getClientConfig()} function
  185 + *
  186 + * @param config The config object to use while looking for settings
  187 + */
  188 + static MqttClient create(MqttClientConfig config){
  189 + return new MqttClientImpl(config);
  190 + }
  191 +
  192 +
  193 + /**
  194 + * Send disconnect and close channel
  195 + *
  196 + */
  197 + void disconnect();
  198 +
  199 + /**
  200 + * Sets the {@see #MqttClientCallback} object for this MqttClient
  201 + * @param callback The callback to be set
  202 + */
  203 + void setCallback(MqttClientCallback callback);
  204 +
  205 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +/**
  19 + * Created by Valerii Sosliuk on 12/30/2017.
  20 + */
  21 +public interface MqttClientCallback {
  22 +
  23 + /**
  24 + * This method is called when the connection to the server is lost.
  25 + *
  26 + * @param cause the reason behind the loss of connection.
  27 + */
  28 + public void connectionLost(Throwable cause);
  29 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.Channel;
  19 +import io.netty.channel.socket.nio.NioSocketChannel;
  20 +import io.netty.handler.codec.mqtt.MqttVersion;
  21 +import io.netty.handler.ssl.SslContext;
  22 +
  23 +import javax.annotation.Nonnull;
  24 +import javax.annotation.Nullable;
  25 +import java.util.Random;
  26 +
  27 +@SuppressWarnings({"WeakerAccess", "unused"})
  28 +public final class MqttClientConfig {
  29 +
  30 + private final SslContext sslContext;
  31 + private final String randomClientId;
  32 +
  33 + private String clientId;
  34 + private int timeoutSeconds = 60;
  35 + private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;
  36 + @Nullable private String username = null;
  37 + @Nullable private String password = null;
  38 + private boolean cleanSession = true;
  39 + @Nullable private MqttLastWill lastWill;
  40 + private Class<? extends Channel> channelClass = NioSocketChannel.class;
  41 +
  42 + private boolean reconnect = true;
  43 +
  44 + public MqttClientConfig() {
  45 + this(null);
  46 + }
  47 +
  48 + public MqttClientConfig(SslContext sslContext) {
  49 + this.sslContext = sslContext;
  50 + Random random = new Random();
  51 + String id = "netty-mqtt/";
  52 + String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split("");
  53 + for(int i = 0; i < 8; i++){
  54 + id += options[random.nextInt(options.length)];
  55 + }
  56 + this.clientId = id;
  57 + this.randomClientId = id;
  58 + }
  59 +
  60 + @Nonnull
  61 + public String getClientId() {
  62 + return clientId;
  63 + }
  64 +
  65 + public void setClientId(@Nullable String clientId) {
  66 + if(clientId == null){
  67 + this.clientId = randomClientId;
  68 + }else{
  69 + this.clientId = clientId;
  70 + }
  71 + }
  72 +
  73 + public int getTimeoutSeconds() {
  74 + return timeoutSeconds;
  75 + }
  76 +
  77 + public void setTimeoutSeconds(int timeoutSeconds) {
  78 + if(timeoutSeconds != -1 && timeoutSeconds <= 0){
  79 + throw new IllegalArgumentException("timeoutSeconds must be > 0 or -1");
  80 + }
  81 + this.timeoutSeconds = timeoutSeconds;
  82 + }
  83 +
  84 + public MqttVersion getProtocolVersion() {
  85 + return protocolVersion;
  86 + }
  87 +
  88 + public void setProtocolVersion(MqttVersion protocolVersion) {
  89 + if(protocolVersion == null){
  90 + throw new NullPointerException("protocolVersion");
  91 + }
  92 + this.protocolVersion = protocolVersion;
  93 + }
  94 +
  95 + @Nullable
  96 + public String getUsername() {
  97 + return username;
  98 + }
  99 +
  100 + public void setUsername(@Nullable String username) {
  101 + this.username = username;
  102 + }
  103 +
  104 + @Nullable
  105 + public String getPassword() {
  106 + return password;
  107 + }
  108 +
  109 + public void setPassword(@Nullable String password) {
  110 + this.password = password;
  111 + }
  112 +
  113 + public boolean isCleanSession() {
  114 + return cleanSession;
  115 + }
  116 +
  117 + public void setCleanSession(boolean cleanSession) {
  118 + this.cleanSession = cleanSession;
  119 + }
  120 +
  121 + @Nullable
  122 + public MqttLastWill getLastWill() {
  123 + return lastWill;
  124 + }
  125 +
  126 + public void setLastWill(@Nullable MqttLastWill lastWill) {
  127 + this.lastWill = lastWill;
  128 + }
  129 +
  130 + public Class<? extends Channel> getChannelClass() {
  131 + return channelClass;
  132 + }
  133 +
  134 + public void setChannelClass(Class<? extends Channel> channelClass) {
  135 + this.channelClass = channelClass;
  136 + }
  137 +
  138 + public SslContext getSslContext() {
  139 + return sslContext;
  140 + }
  141 +
  142 + public boolean isReconnect() {
  143 + return reconnect;
  144 + }
  145 +
  146 + public void setReconnect(boolean reconnect) {
  147 + this.reconnect = reconnect;
  148 + }
  149 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import com.google.common.collect.HashMultimap;
  19 +import com.google.common.collect.ImmutableSet;
  20 +import io.netty.bootstrap.Bootstrap;
  21 +import io.netty.buffer.ByteBuf;
  22 +import io.netty.channel.*;
  23 +import io.netty.channel.nio.NioEventLoopGroup;
  24 +import io.netty.channel.socket.SocketChannel;
  25 +import io.netty.handler.codec.mqtt.*;
  26 +import io.netty.handler.ssl.SslContext;
  27 +import io.netty.handler.timeout.IdleStateHandler;
  28 +import io.netty.util.collection.IntObjectHashMap;
  29 +import io.netty.util.concurrent.DefaultPromise;
  30 +import io.netty.util.concurrent.Future;
  31 +import io.netty.util.concurrent.Promise;
  32 +
  33 +import java.util.*;
  34 +import java.util.concurrent.TimeUnit;
  35 +import java.util.concurrent.atomic.AtomicInteger;
  36 +
  37 +/**
  38 + * Represents an MqttClientImpl connected to a single MQTT server. Will try to keep the connection going at all times
  39 + */
  40 +@SuppressWarnings({"WeakerAccess", "unused"})
  41 +final class MqttClientImpl implements MqttClient {
  42 +
  43 + private final Set<String> serverSubscribtions = new HashSet<>();
  44 + private final IntObjectHashMap<MqttPendingUnsubscribtion> pendingServerUnsubscribes = new IntObjectHashMap<>();
  45 + private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new IntObjectHashMap<>();
  46 + private final IntObjectHashMap<MqttPendingPublish> pendingPublishes = new IntObjectHashMap<>();
  47 + private final HashMultimap<String, MqttSubscribtion> subscriptions = HashMultimap.create();
  48 + private final IntObjectHashMap<MqttPendingSubscribtion> pendingSubscribtions = new IntObjectHashMap<>();
  49 + private final Set<String> pendingSubscribeTopics = new HashSet<>();
  50 + private final HashMultimap<MqttHandler, MqttSubscribtion> handlerToSubscribtion = HashMultimap.create();
  51 + private final AtomicInteger nextMessageId = new AtomicInteger(1);
  52 +
  53 + private final MqttClientConfig clientConfig;
  54 +
  55 + private EventLoopGroup eventLoop;
  56 +
  57 + private Channel channel;
  58 +
  59 + private boolean disconnected = false;
  60 + private String host;
  61 + private int port;
  62 + private MqttClientCallback callback;
  63 +
  64 +
  65 + /**
  66 + * Construct the MqttClientImpl with default config
  67 + */
  68 + public MqttClientImpl() {
  69 + this.clientConfig = new MqttClientConfig();
  70 + }
  71 +
  72 + /**
  73 + * Construct the MqttClientImpl with additional config.
  74 + * This config can also be changed using the {@link #getClientConfig()} function
  75 + *
  76 + * @param clientConfig The config object to use while looking for settings
  77 + */
  78 + public MqttClientImpl(MqttClientConfig clientConfig) {
  79 + this.clientConfig = clientConfig;
  80 + }
  81 +
  82 + /**
  83 + * Connect to the specified hostname/ip. By default uses port 1883.
  84 + * If you want to change the port number, see {@link #connect(String, int)}
  85 + *
  86 + * @param host The ip address or host to connect to
  87 + * @return A future which will be completed when the connection is opened and we received an CONNACK
  88 + */
  89 + @Override
  90 + public Future<MqttConnectResult> connect(String host) {
  91 + return connect(host, 1883);
  92 + }
  93 +
  94 + /**
  95 + * Connect to the specified hostname/ip using the specified port
  96 + *
  97 + * @param host The ip address or host to connect to
  98 + * @param port The tcp port to connect to
  99 + * @return A future which will be completed when the connection is opened and we received an CONNACK
  100 + */
  101 + @Override
  102 + public Future<MqttConnectResult> connect(String host, int port) {
  103 + if (this.eventLoop == null) {
  104 + this.eventLoop = new NioEventLoopGroup();
  105 + }
  106 + this.host = host;
  107 + this.port = port;
  108 +
  109 + Promise<MqttConnectResult> connectFuture = new DefaultPromise<>(this.eventLoop.next());
  110 + Bootstrap bootstrap = new Bootstrap();
  111 + bootstrap.group(this.eventLoop);
  112 + bootstrap.channel(clientConfig.getChannelClass());
  113 + bootstrap.remoteAddress(host, port);
  114 + bootstrap.handler(new MqttChannelInitializer(connectFuture, host, port, clientConfig.getSslContext()));
  115 + ChannelFuture future = bootstrap.connect();
  116 + future.addListener((ChannelFutureListener) f -> {
  117 + if (f.isSuccess()) {
  118 + MqttClientImpl.this.channel = f.channel();
  119 + } else if (clientConfig.isReconnect() && !disconnected) {
  120 + eventLoop.schedule((Runnable) () -> connect(host, port), 1L, TimeUnit.SECONDS);
  121 + }
  122 + });
  123 + return connectFuture;
  124 + }
  125 +
  126 + @Override
  127 + public boolean isConnected() {
  128 + if (!disconnected) {
  129 + return channel == null ? false : channel.isActive();
  130 + };
  131 + return false;
  132 + }
  133 +
  134 + @Override
  135 + public Future<MqttConnectResult> reconnect() {
  136 + if (host == null) {
  137 + throw new IllegalStateException("Cannot reconnect. Call connect() first");
  138 + }
  139 + return connect(host, port);
  140 + }
  141 +
  142 + /**
  143 + * Retrieve the netty {@link EventLoopGroup} we are using
  144 + *
  145 + * @return The netty {@link EventLoopGroup} we use for the connection
  146 + */
  147 + @Override
  148 + public EventLoopGroup getEventLoop() {
  149 + return eventLoop;
  150 + }
  151 +
  152 + /**
  153 + * By default we use the netty {@link NioEventLoopGroup}.
  154 + * If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)}
  155 + * If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)}
  156 + *
  157 + * @param eventLoop The new eventloop to use
  158 + */
  159 + @Override
  160 + public void setEventLoop(EventLoopGroup eventLoop) {
  161 + this.eventLoop = eventLoop;
  162 + }
  163 +
  164 + /**
  165 + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  166 + *
  167 + * @param topic The topic filter to subscribe to
  168 + * @param handler The handler to invoke when we receive a message
  169 + * @return A future which will be completed when the server acknowledges our subscribe request
  170 + */
  171 + @Override
  172 + public Future<Void> on(String topic, MqttHandler handler) {
  173 + return on(topic, handler, MqttQoS.AT_MOST_ONCE);
  174 + }
  175 +
  176 + /**
  177 + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  178 + *
  179 + * @param topic The topic filter to subscribe to
  180 + * @param handler The handler to invoke when we receive a message
  181 + * @param qos The qos to request to the server
  182 + * @return A future which will be completed when the server acknowledges our subscribe request
  183 + */
  184 + @Override
  185 + public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) {
  186 + return createSubscribtion(topic, handler, false, qos);
  187 + }
  188 +
  189 + /**
  190 + * Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  191 + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
  192 + *
  193 + * @param topic The topic filter to subscribe to
  194 + * @param handler The handler to invoke when we receive a message
  195 + * @return A future which will be completed when the server acknowledges our subscribe request
  196 + */
  197 + @Override
  198 + public Future<Void> once(String topic, MqttHandler handler) {
  199 + return once(topic, handler, MqttQoS.AT_MOST_ONCE);
  200 + }
  201 +
  202 + /**
  203 + * Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
  204 + * This subscribtion is only once. If the MqttClient has received 1 message, the subscribtion will be removed
  205 + *
  206 + * @param topic The topic filter to subscribe to
  207 + * @param handler The handler to invoke when we receive a message
  208 + * @param qos The qos to request to the server
  209 + * @return A future which will be completed when the server acknowledges our subscribe request
  210 + */
  211 + @Override
  212 + public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) {
  213 + return createSubscribtion(topic, handler, true, qos);
  214 + }
  215 +
  216 + /**
  217 + * Remove the subscribtion for the given topic and handler
  218 + * If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
  219 + *
  220 + * @param topic The topic to unsubscribe for
  221 + * @param handler The handler to unsubscribe
  222 + * @return A future which will be completed when the server acknowledges our unsubscribe request
  223 + */
  224 + @Override
  225 + public Future<Void> off(String topic, MqttHandler handler) {
  226 + Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
  227 + for (MqttSubscribtion subscribtion : this.handlerToSubscribtion.get(handler)) {
  228 + this.subscriptions.remove(topic, subscribtion);
  229 + }
  230 + this.handlerToSubscribtion.removeAll(handler);
  231 + this.checkSubscribtions(topic, future);
  232 + return future;
  233 + }
  234 +
  235 + /**
  236 + * Remove all subscribtions for the given topic.
  237 + * If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
  238 + *
  239 + * @param topic The topic to unsubscribe for
  240 + * @return A future which will be completed when the server acknowledges our unsubscribe request
  241 + */
  242 + @Override
  243 + public Future<Void> off(String topic) {
  244 + Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
  245 + ImmutableSet<MqttSubscribtion> subscribtions = ImmutableSet.copyOf(this.subscriptions.get(topic));
  246 + for (MqttSubscribtion subscribtion : subscribtions) {
  247 + for (MqttSubscribtion handSub : this.handlerToSubscribtion.get(subscribtion.getHandler())) {
  248 + this.subscriptions.remove(topic, handSub);
  249 + }
  250 + this.handlerToSubscribtion.remove(subscribtion.getHandler(), subscribtion);
  251 + }
  252 + this.checkSubscribtions(topic, future);
  253 + return future;
  254 + }
  255 +
  256 + /**
  257 + * Publish a message to the given payload
  258 + *
  259 + * @param topic The topic to publish to
  260 + * @param payload The payload to send
  261 + * @return A future which will be completed when the message is sent out of the MqttClient
  262 + */
  263 + @Override
  264 + public Future<Void> publish(String topic, ByteBuf payload) {
  265 + return publish(topic, payload, MqttQoS.AT_MOST_ONCE, false);
  266 + }
  267 +
  268 + /**
  269 + * Publish a message to the given payload, using the given qos
  270 + *
  271 + * @param topic The topic to publish to
  272 + * @param payload The payload to send
  273 + * @param qos The qos to use while publishing
  274 + * @return A future which will be completed when the message is delivered to the server
  275 + */
  276 + @Override
  277 + public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos) {
  278 + return publish(topic, payload, qos, false);
  279 + }
  280 +
  281 + /**
  282 + * Publish a message to the given payload, using optional retain
  283 + *
  284 + * @param topic The topic to publish to
  285 + * @param payload The payload to send
  286 + * @param retain true if you want to retain the message on the server, false otherwise
  287 + * @return A future which will be completed when the message is sent out of the MqttClient
  288 + */
  289 + @Override
  290 + public Future<Void> publish(String topic, ByteBuf payload, boolean retain) {
  291 + return publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
  292 + }
  293 +
  294 + /**
  295 + * Publish a message to the given payload, using the given qos and optional retain
  296 + *
  297 + * @param topic The topic to publish to
  298 + * @param payload The payload to send
  299 + * @param qos The qos to use while publishing
  300 + * @param retain true if you want to retain the message on the server, false otherwise
  301 + * @return A future which will be completed when the message is delivered to the server
  302 + */
  303 + @Override
  304 + public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain) {
  305 + Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
  306 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
  307 + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, getNewMessageId().messageId());
  308 + MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
  309 + MqttPendingPublish pendingPublish = new MqttPendingPublish(variableHeader.messageId(), future, payload.retain(), message, qos);
  310 + ChannelFuture channelFuture = this.sendAndFlushPacket(message);
  311 +
  312 + if (channelFuture != null) {
  313 + pendingPublish.setSent(channelFuture != null);
  314 + if (channelFuture.cause() != null) {
  315 + future.setFailure(channelFuture.cause());
  316 + return future;
  317 + }
  318 + }
  319 + if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
  320 + pendingPublish.getFuture().setSuccess(null); //We don't get an ACK for QOS 0
  321 + } else if (pendingPublish.isSent()) {
  322 + this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish);
  323 + pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
  324 + }
  325 + return future;
  326 + }
  327 +
  328 + /**
  329 + * Retrieve the MqttClient configuration
  330 + *
  331 + * @return The {@link MqttClientConfig} instance we use
  332 + */
  333 + @Override
  334 + public MqttClientConfig getClientConfig() {
  335 + return clientConfig;
  336 + }
  337 +
  338 + @Override
  339 + public void disconnect() {
  340 + disconnected = true;
  341 + if (this.channel != null) {
  342 + MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
  343 + this.sendAndFlushPacket(message).addListener(future1 -> channel.close());
  344 + }
  345 + }
  346 +
  347 + @Override
  348 + public void setCallback(MqttClientCallback callback) {
  349 + this.callback = callback;
  350 + }
  351 +
  352 +
  353 + ///////////////////////////////////////////// PRIVATE API /////////////////////////////////////////////
  354 +
  355 + ChannelFuture sendAndFlushPacket(Object message) {
  356 + if (this.channel == null) {
  357 + return null;
  358 + }
  359 + if (this.channel.isActive()) {
  360 + return this.channel.writeAndFlush(message);
  361 + }
  362 + ChannelClosedException e = new ChannelClosedException("Channel is closed");
  363 + if (callback != null) {
  364 + callback.connectionLost(e);
  365 + }
  366 + return this.channel.newFailedFuture(e);
  367 + }
  368 +
  369 + private MqttMessageIdVariableHeader getNewMessageId() {
  370 + this.nextMessageId.compareAndSet(0xffff, 1);
  371 + return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement());
  372 + }
  373 +
  374 + private Future<Void> createSubscribtion(String topic, MqttHandler handler, boolean once, MqttQoS qos) {
  375 + if (this.pendingSubscribeTopics.contains(topic)) {
  376 + Optional<Map.Entry<Integer, MqttPendingSubscribtion>> subscribtionEntry = this.pendingSubscribtions.entrySet().stream().filter((e) -> e.getValue().getTopic().equals(topic)).findAny();
  377 + if (subscribtionEntry.isPresent()) {
  378 + subscribtionEntry.get().getValue().addHandler(handler, once);
  379 + return subscribtionEntry.get().getValue().getFuture();
  380 + }
  381 + }
  382 + if (this.serverSubscribtions.contains(topic)) {
  383 + MqttSubscribtion subscribtion = new MqttSubscribtion(topic, handler, once);
  384 + this.subscriptions.put(topic, subscribtion);
  385 + this.handlerToSubscribtion.put(handler, subscribtion);
  386 + return this.channel.newSucceededFuture();
  387 + }
  388 +
  389 + Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
  390 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  391 + MqttTopicSubscription subscription = new MqttTopicSubscription(topic, qos);
  392 + MqttMessageIdVariableHeader variableHeader = getNewMessageId();
  393 + MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
  394 + MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
  395 +
  396 + final MqttPendingSubscribtion pendingSubscribtion = new MqttPendingSubscribtion(future, topic, message);
  397 + pendingSubscribtion.addHandler(handler, once);
  398 + this.pendingSubscribtions.put(variableHeader.messageId(), pendingSubscribtion);
  399 + this.pendingSubscribeTopics.add(topic);
  400 + pendingSubscribtion.setSent(this.sendAndFlushPacket(message) != null); //If not sent, we will send it when the connection is opened
  401 +
  402 + pendingSubscribtion.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
  403 +
  404 + return future;
  405 + }
  406 +
  407 + private void checkSubscribtions(String topic, Promise<Void> promise) {
  408 + if (!(this.subscriptions.containsKey(topic) && this.subscriptions.get(topic).size() != 0) && this.serverSubscribtions.contains(topic)) {
  409 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  410 + MqttMessageIdVariableHeader variableHeader = getNewMessageId();
  411 + MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
  412 + MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
  413 +
  414 + MqttPendingUnsubscribtion pendingUnsubscribtion = new MqttPendingUnsubscribtion(promise, topic, message);
  415 + this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscribtion);
  416 + pendingUnsubscribtion.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
  417 +
  418 + this.sendAndFlushPacket(message);
  419 + } else {
  420 + promise.setSuccess(null);
  421 + }
  422 + }
  423 +
  424 + IntObjectHashMap<MqttPendingSubscribtion> getPendingSubscribtions() {
  425 + return pendingSubscribtions;
  426 + }
  427 +
  428 + HashMultimap<String, MqttSubscribtion> getSubscriptions() {
  429 + return subscriptions;
  430 + }
  431 +
  432 + Set<String> getPendingSubscribeTopics() {
  433 + return pendingSubscribeTopics;
  434 + }
  435 +
  436 + HashMultimap<MqttHandler, MqttSubscribtion> getHandlerToSubscribtion() {
  437 + return handlerToSubscribtion;
  438 + }
  439 +
  440 + Set<String> getServerSubscribtions() {
  441 + return serverSubscribtions;
  442 + }
  443 +
  444 + IntObjectHashMap<MqttPendingUnsubscribtion> getPendingServerUnsubscribes() {
  445 + return pendingServerUnsubscribes;
  446 + }
  447 +
  448 + IntObjectHashMap<MqttPendingPublish> getPendingPublishes() {
  449 + return pendingPublishes;
  450 + }
  451 +
  452 + IntObjectHashMap<MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() {
  453 + return qos2PendingIncomingPublishes;
  454 + }
  455 +
  456 + private class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {
  457 +
  458 + private final Promise<MqttConnectResult> connectFuture;
  459 + private final String host;
  460 + private final int port;
  461 + private final SslContext sslContext;
  462 +
  463 +
  464 + public MqttChannelInitializer(Promise<MqttConnectResult> connectFuture, String host, int port, SslContext sslContext) {
  465 + this.connectFuture = connectFuture;
  466 + this.host = host;
  467 + this.port = port;
  468 + this.sslContext = sslContext;
  469 + }
  470 +
  471 + @Override
  472 + protected void initChannel(SocketChannel ch) throws Exception {
  473 + if (sslContext != null) {
  474 + ch.pipeline().addLast(sslContext.newHandler(ch.alloc(), host, port));
  475 + }
  476 +
  477 + ch.pipeline().addLast("mqttDecoder", new MqttDecoder());
  478 + ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
  479 + ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
  480 + ch.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));
  481 + ch.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, connectFuture));
  482 + }
  483 + }
  484 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.ChannelFuture;
  19 +import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
  20 +
  21 +@SuppressWarnings({"WeakerAccess", "unused"})
  22 +public final class MqttConnectResult {
  23 +
  24 + private final boolean success;
  25 + private final MqttConnectReturnCode returnCode;
  26 + private final ChannelFuture closeFuture;
  27 +
  28 + MqttConnectResult(boolean success, MqttConnectReturnCode returnCode, ChannelFuture closeFuture) {
  29 + this.success = success;
  30 + this.returnCode = returnCode;
  31 + this.closeFuture = closeFuture;
  32 + }
  33 +
  34 + public boolean isSuccess() {
  35 + return success;
  36 + }
  37 +
  38 + public MqttConnectReturnCode getReturnCode() {
  39 + return returnCode;
  40 + }
  41 +
  42 + public ChannelFuture getCloseFuture() {
  43 + return closeFuture;
  44 + }
  45 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.buffer.ByteBuf;
  19 +
  20 +public interface MqttHandler {
  21 +
  22 + void onMessage(String topic, ByteBuf payload);
  23 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.EventLoop;
  19 +import io.netty.handler.codec.mqtt.*;
  20 +
  21 +import java.util.function.Consumer;
  22 +
  23 +final class MqttIncomingQos2Publish {
  24 +
  25 + private final MqttPublishMessage incomingPublish;
  26 +
  27 + private final RetransmissionHandler<MqttMessage> retransmissionHandler = new RetransmissionHandler<>();
  28 +
  29 + MqttIncomingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
  30 + this.incomingPublish = incomingPublish;
  31 +
  32 + this.retransmissionHandler.setOriginalMessage(originalMessage);
  33 + }
  34 +
  35 + MqttPublishMessage getIncomingPublish() {
  36 + return incomingPublish;
  37 + }
  38 +
  39 + void startPubrecRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
  40 + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
  41 + sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
  42 + this.retransmissionHandler.start(eventLoop);
  43 + }
  44 +
  45 + void onPubrelReceived() {
  46 + this.retransmissionHandler.stop();
  47 + }
  48 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.handler.codec.mqtt.MqttQoS;
  19 +
  20 +@SuppressWarnings({"WeakerAccess", "unused", "SimplifiableIfStatement", "StringBufferReplaceableByString"})
  21 +public final class MqttLastWill {
  22 +
  23 + private final String topic;
  24 + private final String message;
  25 + private final boolean retain;
  26 + private final MqttQoS qos;
  27 +
  28 + public MqttLastWill(String topic, String message, boolean retain, MqttQoS qos) {
  29 + if(topic == null){
  30 + throw new NullPointerException("topic");
  31 + }
  32 + if(message == null){
  33 + throw new NullPointerException("message");
  34 + }
  35 + if(qos == null){
  36 + throw new NullPointerException("qos");
  37 + }
  38 + this.topic = topic;
  39 + this.message = message;
  40 + this.retain = retain;
  41 + this.qos = qos;
  42 + }
  43 +
  44 + public String getTopic() {
  45 + return topic;
  46 + }
  47 +
  48 + public String getMessage() {
  49 + return message;
  50 + }
  51 +
  52 + public boolean isRetain() {
  53 + return retain;
  54 + }
  55 +
  56 + public MqttQoS getQos() {
  57 + return qos;
  58 + }
  59 +
  60 + public static MqttLastWill.Builder builder(){
  61 + return new MqttLastWill.Builder();
  62 + }
  63 +
  64 + public static final class Builder {
  65 +
  66 + private String topic;
  67 + private String message;
  68 + private boolean retain;
  69 + private MqttQoS qos;
  70 +
  71 + public String getTopic() {
  72 + return topic;
  73 + }
  74 +
  75 + public Builder setTopic(String topic) {
  76 + if(topic == null){
  77 + throw new NullPointerException("topic");
  78 + }
  79 + this.topic = topic;
  80 + return this;
  81 + }
  82 +
  83 + public String getMessage() {
  84 + return message;
  85 + }
  86 +
  87 + public Builder setMessage(String message) {
  88 + if(message == null){
  89 + throw new NullPointerException("message");
  90 + }
  91 + this.message = message;
  92 + return this;
  93 + }
  94 +
  95 + public boolean isRetain() {
  96 + return retain;
  97 + }
  98 +
  99 + public Builder setRetain(boolean retain) {
  100 + this.retain = retain;
  101 + return this;
  102 + }
  103 +
  104 + public MqttQoS getQos() {
  105 + return qos;
  106 + }
  107 +
  108 + public Builder setQos(MqttQoS qos) {
  109 + if(qos == null){
  110 + throw new NullPointerException("qos");
  111 + }
  112 + this.qos = qos;
  113 + return this;
  114 + }
  115 +
  116 + public MqttLastWill build(){
  117 + return new MqttLastWill(topic, message, retain, qos);
  118 + }
  119 + }
  120 +
  121 + @Override
  122 + public boolean equals(Object o) {
  123 + if (this == o) return true;
  124 + if (o == null || getClass() != o.getClass()) return false;
  125 +
  126 + MqttLastWill that = (MqttLastWill) o;
  127 +
  128 + if (retain != that.retain) return false;
  129 + if (!topic.equals(that.topic)) return false;
  130 + if (!message.equals(that.message)) return false;
  131 + return qos == that.qos;
  132 +
  133 + }
  134 +
  135 + @Override
  136 + public int hashCode() {
  137 + int result = topic.hashCode();
  138 + result = 31 * result + message.hashCode();
  139 + result = 31 * result + (retain ? 1 : 0);
  140 + result = 31 * result + qos.hashCode();
  141 + return result;
  142 + }
  143 +
  144 + @Override
  145 + public String toString() {
  146 + final StringBuilder sb = new StringBuilder("MqttLastWill{");
  147 + sb.append("topic='").append(topic).append('\'');
  148 + sb.append(", message='").append(message).append('\'');
  149 + sb.append(", retain=").append(retain);
  150 + sb.append(", qos=").append(qos.name());
  151 + sb.append('}');
  152 + return sb.toString();
  153 + }
  154 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.buffer.ByteBuf;
  19 +import io.netty.channel.EventLoop;
  20 +import io.netty.handler.codec.mqtt.MqttMessage;
  21 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  22 +import io.netty.handler.codec.mqtt.MqttQoS;
  23 +import io.netty.util.concurrent.Promise;
  24 +
  25 +import java.util.function.Consumer;
  26 +
  27 +final class MqttPendingPublish {
  28 +
  29 + private final int messageId;
  30 + private final Promise<Void> future;
  31 + private final ByteBuf payload;
  32 + private final MqttPublishMessage message;
  33 + private final MqttQoS qos;
  34 +
  35 + private final RetransmissionHandler<MqttPublishMessage> publishRetransmissionHandler = new RetransmissionHandler<>();
  36 + private final RetransmissionHandler<MqttMessage> pubrelRetransmissionHandler = new RetransmissionHandler<>();
  37 +
  38 + private boolean sent = false;
  39 +
  40 + MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
  41 + this.messageId = messageId;
  42 + this.future = future;
  43 + this.payload = payload;
  44 + this.message = message;
  45 + this.qos = qos;
  46 +
  47 + this.publishRetransmissionHandler.setOriginalMessage(message);
  48 + }
  49 +
  50 + int getMessageId() {
  51 + return messageId;
  52 + }
  53 +
  54 + Promise<Void> getFuture() {
  55 + return future;
  56 + }
  57 +
  58 + ByteBuf getPayload() {
  59 + return payload;
  60 + }
  61 +
  62 + boolean isSent() {
  63 + return sent;
  64 + }
  65 +
  66 + void setSent(boolean sent) {
  67 + this.sent = sent;
  68 + }
  69 +
  70 + MqttPublishMessage getMessage() {
  71 + return message;
  72 + }
  73 +
  74 + MqttQoS getQos() {
  75 + return qos;
  76 + }
  77 +
  78 + void startPublishRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
  79 + this.publishRetransmissionHandler.setHandle(((fixedHeader, originalMessage) ->
  80 + sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload.retain()))));
  81 + this.publishRetransmissionHandler.start(eventLoop);
  82 + }
  83 +
  84 + void onPubackReceived() {
  85 + this.publishRetransmissionHandler.stop();
  86 + }
  87 +
  88 + void setPubrelMessage(MqttMessage pubrelMessage) {
  89 + this.pubrelRetransmissionHandler.setOriginalMessage(pubrelMessage);
  90 + }
  91 +
  92 + void startPubrelRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
  93 + this.pubrelRetransmissionHandler.setHandle((fixedHeader, originalMessage) ->
  94 + sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
  95 + this.pubrelRetransmissionHandler.start(eventLoop);
  96 + }
  97 +
  98 + void onPubcompReceived() {
  99 + this.pubrelRetransmissionHandler.stop();
  100 + }
  101 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.EventLoop;
  19 +import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  20 +import io.netty.util.concurrent.Promise;
  21 +
  22 +import java.util.HashSet;
  23 +import java.util.Set;
  24 +import java.util.function.Consumer;
  25 +
  26 +final class MqttPendingSubscribtion {
  27 +
  28 + private final Promise<Void> future;
  29 + private final String topic;
  30 + private final Set<MqttPendingHandler> handlers = new HashSet<>();
  31 + private final MqttSubscribeMessage subscribeMessage;
  32 +
  33 + private final RetransmissionHandler<MqttSubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
  34 +
  35 + private boolean sent = false;
  36 +
  37 + MqttPendingSubscribtion(Promise<Void> future, String topic, MqttSubscribeMessage message) {
  38 + this.future = future;
  39 + this.topic = topic;
  40 + this.subscribeMessage = message;
  41 +
  42 + this.retransmissionHandler.setOriginalMessage(message);
  43 + }
  44 +
  45 + Promise<Void> getFuture() {
  46 + return future;
  47 + }
  48 +
  49 + String getTopic() {
  50 + return topic;
  51 + }
  52 +
  53 + boolean isSent() {
  54 + return sent;
  55 + }
  56 +
  57 + void setSent(boolean sent) {
  58 + this.sent = sent;
  59 + }
  60 +
  61 + MqttSubscribeMessage getSubscribeMessage() {
  62 + return subscribeMessage;
  63 + }
  64 +
  65 + void addHandler(MqttHandler handler, boolean once){
  66 + this.handlers.add(new MqttPendingHandler(handler, once));
  67 + }
  68 +
  69 + Set<MqttPendingHandler> getHandlers() {
  70 + return handlers;
  71 + }
  72 +
  73 + void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
  74 + if(this.sent){ //If the packet is sent, we can start the retransmit timer
  75 + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
  76 + sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
  77 + this.retransmissionHandler.start(eventLoop);
  78 + }
  79 + }
  80 +
  81 + void onSubackReceived(){
  82 + this.retransmissionHandler.stop();
  83 + }
  84 +
  85 + final class MqttPendingHandler {
  86 + private final MqttHandler handler;
  87 + private final boolean once;
  88 +
  89 + MqttPendingHandler(MqttHandler handler, boolean once) {
  90 + this.handler = handler;
  91 + this.once = once;
  92 + }
  93 +
  94 + MqttHandler getHandler() {
  95 + return handler;
  96 + }
  97 +
  98 + boolean isOnce() {
  99 + return once;
  100 + }
  101 + }
  102 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.EventLoop;
  19 +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
  20 +import io.netty.util.concurrent.Promise;
  21 +
  22 +import java.util.function.Consumer;
  23 +
  24 +final class MqttPendingUnsubscribtion {
  25 +
  26 + private final Promise<Void> future;
  27 + private final String topic;
  28 +
  29 + private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
  30 +
  31 + MqttPendingUnsubscribtion(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) {
  32 + this.future = future;
  33 + this.topic = topic;
  34 +
  35 + this.retransmissionHandler.setOriginalMessage(unsubscribeMessage);
  36 + }
  37 +
  38 + Promise<Void> getFuture() {
  39 + return future;
  40 + }
  41 +
  42 + String getTopic() {
  43 + return topic;
  44 + }
  45 +
  46 + void startRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
  47 + this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
  48 + sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
  49 + this.retransmissionHandler.start(eventLoop);
  50 + }
  51 +
  52 + void onUnsubackReceived(){
  53 + this.retransmissionHandler.stop();
  54 + }
  55 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.Channel;
  19 +import io.netty.channel.ChannelFutureListener;
  20 +import io.netty.channel.ChannelHandlerContext;
  21 +import io.netty.channel.ChannelInboundHandlerAdapter;
  22 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  23 +import io.netty.handler.codec.mqtt.MqttMessage;
  24 +import io.netty.handler.codec.mqtt.MqttMessageType;
  25 +import io.netty.handler.codec.mqtt.MqttQoS;
  26 +import io.netty.handler.timeout.IdleStateEvent;
  27 +import io.netty.util.ReferenceCountUtil;
  28 +import io.netty.util.concurrent.ScheduledFuture;
  29 +
  30 +import java.util.concurrent.TimeUnit;
  31 +
  32 +final class MqttPingHandler extends ChannelInboundHandlerAdapter {
  33 +
  34 + private final int keepaliveSeconds;
  35 +
  36 + private ScheduledFuture<?> pingRespTimeout;
  37 +
  38 + MqttPingHandler(int keepaliveSeconds) {
  39 + this.keepaliveSeconds = keepaliveSeconds;
  40 + }
  41 +
  42 + @Override
  43 + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  44 + if (!(msg instanceof MqttMessage)) {
  45 + ctx.fireChannelRead(msg);
  46 + return;
  47 + }
  48 + MqttMessage message = (MqttMessage) msg;
  49 + if(message.fixedHeader().messageType() == MqttMessageType.PINGREQ){
  50 + this.handlePingReq(ctx.channel());
  51 + } else if(message.fixedHeader().messageType() == MqttMessageType.PINGRESP){
  52 + this.handlePingResp();
  53 + }else{
  54 + ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
  55 + }
  56 + }
  57 +
  58 + @Override
  59 + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  60 + super.userEventTriggered(ctx, evt);
  61 +
  62 + if(evt instanceof IdleStateEvent){
  63 + IdleStateEvent event = (IdleStateEvent) evt;
  64 + switch(event.state()){
  65 + case READER_IDLE:
  66 + break;
  67 + case WRITER_IDLE:
  68 + this.sendPingReq(ctx.channel());
  69 + break;
  70 + }
  71 + }
  72 + }
  73 +
  74 + private void sendPingReq(Channel channel){
  75 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
  76 + channel.writeAndFlush(new MqttMessage(fixedHeader));
  77 +
  78 + if(this.pingRespTimeout != null){
  79 + this.pingRespTimeout = channel.eventLoop().schedule(() -> {
  80 + MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
  81 + channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
  82 + //TODO: what do when the connection is closed ?
  83 + }, this.keepaliveSeconds, TimeUnit.SECONDS);
  84 + }
  85 + }
  86 +
  87 + private void handlePingReq(Channel channel){
  88 + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
  89 + channel.writeAndFlush(new MqttMessage(fixedHeader));
  90 + }
  91 +
  92 + private void handlePingResp(){
  93 + if(this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()){
  94 + this.pingRespTimeout.cancel(true);
  95 + this.pingRespTimeout = null;
  96 + }
  97 + }
  98 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import java.util.regex.Pattern;
  19 +
  20 +final class MqttSubscribtion {
  21 +
  22 + private final String topic;
  23 + private final Pattern topicRegex;
  24 + private final MqttHandler handler;
  25 +
  26 + private final boolean once;
  27 +
  28 + private boolean called;
  29 +
  30 + MqttSubscribtion(String topic, MqttHandler handler, boolean once) {
  31 + if(topic == null){
  32 + throw new NullPointerException("topic");
  33 + }
  34 + if(handler == null){
  35 + throw new NullPointerException("handler");
  36 + }
  37 + this.topic = topic;
  38 + this.handler = handler;
  39 + this.once = once;
  40 + this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
  41 + }
  42 +
  43 + String getTopic() {
  44 + return topic;
  45 + }
  46 +
  47 + public MqttHandler getHandler() {
  48 + return handler;
  49 + }
  50 +
  51 + boolean isOnce() {
  52 + return once;
  53 + }
  54 +
  55 + boolean isCalled() {
  56 + return called;
  57 + }
  58 +
  59 + boolean matches(String topic){
  60 + return this.topicRegex.matcher(topic).matches();
  61 + }
  62 +
  63 + @Override
  64 + public boolean equals(Object o) {
  65 + if (this == o) return true;
  66 + if (o == null || getClass() != o.getClass()) return false;
  67 +
  68 + MqttSubscribtion that = (MqttSubscribtion) o;
  69 +
  70 + return once == that.once && topic.equals(that.topic) && handler.equals(that.handler);
  71 + }
  72 +
  73 + @Override
  74 + public int hashCode() {
  75 + int result = topic.hashCode();
  76 + result = 31 * result + handler.hashCode();
  77 + result = 31 * result + (once ? 1 : 0);
  78 + return result;
  79 + }
  80 +
  81 + void setCalled(boolean called) {
  82 + this.called = called;
  83 + }
  84 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.mqtt;
  17 +
  18 +import io.netty.channel.EventLoop;
  19 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  20 +import io.netty.handler.codec.mqtt.MqttMessage;
  21 +import io.netty.util.concurrent.ScheduledFuture;
  22 +
  23 +import java.util.concurrent.TimeUnit;
  24 +import java.util.function.BiConsumer;
  25 +
  26 +final class RetransmissionHandler<T extends MqttMessage> {
  27 +
  28 + private ScheduledFuture<?> timer;
  29 + private int timeout = 10;
  30 + private BiConsumer<MqttFixedHeader, T> handler;
  31 + private T originalMessage;
  32 +
  33 + void start(EventLoop eventLoop){
  34 + if(eventLoop == null){
  35 + throw new NullPointerException("eventLoop");
  36 + }
  37 + if(this.handler == null){
  38 + throw new NullPointerException("handler");
  39 + }
  40 + this.timeout = 10;
  41 + this.startTimer(eventLoop);
  42 + }
  43 +
  44 + private void startTimer(EventLoop eventLoop){
  45 + this.timer = eventLoop.schedule(() -> {
  46 + this.timeout += 5;
  47 + MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), true, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
  48 + handler.accept(fixedHeader, originalMessage);
  49 + startTimer(eventLoop);
  50 + }, timeout, TimeUnit.SECONDS);
  51 + }
  52 +
  53 + void stop(){
  54 + if(this.timer != null){
  55 + this.timer.cancel(true);
  56 + }
  57 + }
  58 +
  59 + void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
  60 + this.handler = runnable;
  61 + }
  62 +
  63 + void setOriginalMessage(T originalMessage) {
  64 + this.originalMessage = originalMessage;
  65 + }
  66 +}
... ...
... ... @@ -79,7 +79,6 @@
79 79 <dbunit.version>2.5.3</dbunit.version>
80 80 <spring-test-dbunit.version>1.2.1</spring-test-dbunit.version>
81 81 <postgresql.driver.version>9.4.1211</postgresql.driver.version>
82   - <netty-mqtt-client.version>2.0.0TB</netty-mqtt-client.version>
83 82 <sonar.exclusions>org/thingsboard/server/gen/**/*,
84 83 org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
85 84 </sonar.exclusions>
... ... @@ -87,6 +86,7 @@
87 86 </properties>
88 87
89 88 <modules>
  89 + <module>netty-mqtt</module>
90 90 <module>common</module>
91 91 <module>rule-engine</module>
92 92 <module>dao</module>
... ... @@ -326,6 +326,11 @@
326 326 <dependencies>
327 327 <dependency>
328 328 <groupId>org.thingsboard</groupId>
  329 + <artifactId>netty-mqtt</artifactId>
  330 + <version>${project.version}</version>
  331 + </dependency>
  332 + <dependency>
  333 + <groupId>org.thingsboard</groupId>
329 334 <artifactId>extensions-api</artifactId>
330 335 <version>${project.version}</version>
331 336 </dependency>
... ... @@ -569,6 +574,11 @@
569 574 <version>${netty.version}</version>
570 575 </dependency>
571 576 <dependency>
  577 + <groupId>io.netty</groupId>
  578 + <artifactId>netty-codec-mqtt</artifactId>
  579 + <version>${netty.version}</version>
  580 + </dependency>
  581 + <dependency>
572 582 <groupId>com.datastax.cassandra</groupId>
573 583 <artifactId>cassandra-driver-core</artifactId>
574 584 <version>${cassandra.version}</version>
... ... @@ -820,11 +830,6 @@
820 830 <scope>provided</scope>
821 831 </dependency>
822 832 <dependency>
823   - <groupId>nl.jk5.netty-mqtt</groupId>
824   - <artifactId>netty-mqtt</artifactId>
825   - <version>${netty-mqtt-client.version}</version>
826   - </dependency>
827   - <dependency>
828 833 <groupId>org.elasticsearch.client</groupId>
829 834 <artifactId>rest</artifactId>
830 835 <version>${elasticsearch.version}</version>
... ...
... ... @@ -69,6 +69,10 @@
69 69 <artifactId>rule-engine-api</artifactId>
70 70 </dependency>
71 71 <dependency>
  72 + <groupId>org.thingsboard</groupId>
  73 + <artifactId>netty-mqtt</artifactId>
  74 + </dependency>
  75 + <dependency>
72 76 <groupId>com.google.guava</groupId>
73 77 <artifactId>guava</artifactId>
74 78 </dependency>
... ... @@ -91,10 +95,6 @@
91 95 <artifactId>amqp-client</artifactId>
92 96 </dependency>
93 97 <dependency>
94   - <groupId>nl.jk5.netty-mqtt</groupId>
95   - <artifactId>netty-mqtt</artifactId>
96   - </dependency>
97   - <dependency>
98 98 <groupId>org.bouncycastle</groupId>
99 99 <artifactId>bcpkix-jdk15on</artifactId>
100 100 </dependency>
... ...
... ... @@ -24,9 +24,9 @@ import io.netty.handler.ssl.SslContext;
24 24 import io.netty.handler.ssl.SslContextBuilder;
25 25 import io.netty.util.concurrent.Future;
26 26 import lombok.extern.slf4j.Slf4j;
27   -import nl.jk5.mqtt.MqttClient;
28   -import nl.jk5.mqtt.MqttClientConfig;
29   -import nl.jk5.mqtt.MqttConnectResult;
  27 +import org.thingsboard.mqtt.MqttClient;
  28 +import org.thingsboard.mqtt.MqttClientConfig;
  29 +import org.thingsboard.mqtt.MqttConnectResult;
30 30 import org.springframework.util.StringUtils;
31 31 import org.thingsboard.rule.engine.TbNodeUtils;
32 32 import org.thingsboard.rule.engine.api.*;
... ...
... ... @@ -17,7 +17,7 @@
17 17 package org.thingsboard.rule.engine.mqtt.credentials;
18 18
19 19 import io.netty.handler.ssl.SslContext;
20   -import nl.jk5.mqtt.MqttClientConfig;
  20 +import org.thingsboard.mqtt.MqttClientConfig;
21 21
22 22 import java.util.Optional;
23 23
... ...
... ... @@ -18,7 +18,7 @@ package org.thingsboard.rule.engine.mqtt.credentials;
18 18
19 19 import io.netty.handler.ssl.SslContext;
20 20 import lombok.Data;
21   -import nl.jk5.mqtt.MqttClientConfig;
  21 +import org.thingsboard.mqtt.MqttClientConfig;
22 22
23 23 import java.util.Optional;
24 24
... ...
... ... @@ -22,7 +22,7 @@ import io.netty.handler.ssl.SslContext;
22 22 import io.netty.handler.ssl.SslContextBuilder;
23 23 import lombok.Data;
24 24 import lombok.extern.slf4j.Slf4j;
25   -import nl.jk5.mqtt.MqttClientConfig;
  25 +import org.thingsboard.mqtt.MqttClientConfig;
26 26 import org.apache.commons.codec.binary.Base64;
27 27 import org.bouncycastle.jce.provider.BouncyCastleProvider;
28 28 import org.bouncycastle.openssl.PEMDecryptorProvider;
... ...
... ... @@ -19,7 +19,7 @@ package org.thingsboard.rule.engine.mqtt.credentials;
19 19 import com.fasterxml.jackson.annotation.JsonSubTypes;
20 20 import com.fasterxml.jackson.annotation.JsonTypeInfo;
21 21 import io.netty.handler.ssl.SslContext;
22   -import nl.jk5.mqtt.MqttClientConfig;
  22 +import org.thingsboard.mqtt.MqttClientConfig;
23 23
24 24 import java.util.Optional;
25 25
... ...