Commit 54a344e1d6b3df1de89d39e1c9b00900b4c46729

Authored by Igor Kulikov
1 parent 011bee6e

MQTT Rule Node

... ... @@ -79,6 +79,7 @@
79 79 <dbunit.version>2.5.3</dbunit.version>
80 80 <spring-test-dbunit.version>1.2.1</spring-test-dbunit.version>
81 81 <postgresql.driver.version>9.4.1211</postgresql.driver.version>
  82 + <netty-mqtt-client.version>2.0.0TB</netty-mqtt-client.version>
82 83 <sonar.exclusions>org/thingsboard/server/gen/**/*,
83 84 org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/*
84 85 </sonar.exclusions>
... ... @@ -819,6 +820,11 @@
819 820 <scope>provided</scope>
820 821 </dependency>
821 822 <dependency>
  823 + <groupId>nl.jk5.netty-mqtt</groupId>
  824 + <artifactId>netty-mqtt</artifactId>
  825 + <version>${netty-mqtt-client.version}</version>
  826 + </dependency>
  827 + <dependency>
822 828 <groupId>org.elasticsearch.client</groupId>
823 829 <artifactId>rest</artifactId>
824 830 <version>${elasticsearch.version}</version>
... ...
... ... @@ -101,6 +101,14 @@
101 101 <artifactId>amqp-client</artifactId>
102 102 </dependency>
103 103 <dependency>
  104 + <groupId>nl.jk5.netty-mqtt</groupId>
  105 + <artifactId>netty-mqtt</artifactId>
  106 + </dependency>
  107 + <dependency>
  108 + <groupId>org.bouncycastle</groupId>
  109 + <artifactId>bcpkix-jdk15on</artifactId>
  110 + </dependency>
  111 + <dependency>
104 112 <groupId>junit</groupId>
105 113 <artifactId>junit</artifactId>
106 114 <version>${junit.version}</version>
... ...
... ... @@ -13,17 +13,12 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
  16 +
16 17 package org.thingsboard.rule.engine.aws.sqs;
17 18
18 19 import com.amazonaws.auth.AWSCredentials;
19 20 import com.amazonaws.auth.AWSStaticCredentialsProvider;
20 21 import com.amazonaws.auth.BasicAWSCredentials;
21   -import com.amazonaws.regions.Region;
22   -import com.amazonaws.regions.Regions;
23   -import com.amazonaws.services.sns.AmazonSNS;
24   -import com.amazonaws.services.sns.AmazonSNSClient;
25   -import com.amazonaws.services.sns.model.PublishRequest;
26   -import com.amazonaws.services.sns.model.PublishResult;
27 22 import com.amazonaws.services.sqs.AmazonSQS;
28 23 import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
29 24 import com.amazonaws.services.sqs.model.MessageAttributeValue;
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt;
  18 +
  19 +import io.netty.buffer.Unpooled;
  20 +import io.netty.channel.EventLoopGroup;
  21 +import io.netty.channel.nio.NioEventLoopGroup;
  22 +import io.netty.handler.codec.mqtt.MqttQoS;
  23 +import io.netty.handler.ssl.SslContext;
  24 +import io.netty.handler.ssl.SslContextBuilder;
  25 +import io.netty.util.concurrent.Future;
  26 +import lombok.extern.slf4j.Slf4j;
  27 +import nl.jk5.mqtt.MqttClient;
  28 +import nl.jk5.mqtt.MqttClientConfig;
  29 +import nl.jk5.mqtt.MqttConnectResult;
  30 +import org.springframework.util.StringUtils;
  31 +import org.thingsboard.rule.engine.TbNodeUtils;
  32 +import org.thingsboard.rule.engine.api.*;
  33 +import org.thingsboard.server.common.data.plugin.ComponentType;
  34 +import org.thingsboard.server.common.msg.TbMsg;
  35 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  36 +
  37 +import javax.net.ssl.SSLException;
  38 +import java.nio.charset.Charset;
  39 +import java.util.Optional;
  40 +import java.util.concurrent.ExecutionException;
  41 +import java.util.concurrent.TimeUnit;
  42 +import java.util.concurrent.TimeoutException;
  43 +
  44 +@Slf4j
  45 +@RuleNode(
  46 + type = ComponentType.ACTION,
  47 + name = "mqtt",
  48 + configClazz = TbMqttNodeConfiguration.class,
  49 + nodeDescription = "Publish messages to MQTT broker",
  50 + nodeDetails = "Expects messages with any message type. Will publish message to MQTT broker.",
  51 + uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
  52 + configDirective = "tbActionNodeMqttConfig"
  53 +)
  54 +public class TbMqttNode implements TbNode {
  55 +
  56 + private static final Charset UTF8 = Charset.forName("UTF-8");
  57 +
  58 + private static final String ERROR = "error";
  59 +
  60 + private TbMqttNodeConfiguration config;
  61 +
  62 + private EventLoopGroup eventLoopGroup;
  63 + private MqttClient mqttClient;
  64 +
  65 + @Override
  66 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  67 + try {
  68 + this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
  69 + this.eventLoopGroup = new NioEventLoopGroup();
  70 + this.mqttClient = initClient();
  71 + } catch (Exception e) {
  72 + throw new TbNodeException(e);
  73 + }
  74 + }
  75 +
  76 + @Override
  77 + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
  78 + String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData());
  79 + this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
  80 + .addListener(future -> {
  81 + if (future.isSuccess()) {
  82 + TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData());
  83 + ctx.tellNext(next, TbRelationTypes.SUCCESS);
  84 + } else {
  85 + TbMsg next = processException(ctx, msg, future.cause());
  86 + ctx.tellNext(next, TbRelationTypes.FAILURE, future.cause());
  87 + }
  88 + }
  89 + );
  90 + }
  91 +
  92 + private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {
  93 + TbMsgMetaData metaData = origMsg.getMetaData().copy();
  94 + metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage());
  95 + return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData());
  96 + }
  97 +
  98 + @Override
  99 + public void destroy() {
  100 + if (this.mqttClient != null) {
  101 + this.mqttClient.disconnect();
  102 + }
  103 + if (this.eventLoopGroup != null) {
  104 + this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
  105 + }
  106 + }
  107 +
  108 + private MqttClient initClient() throws Exception {
  109 + Optional<SslContext> sslContextOpt = initSslContext();
  110 + MqttClientConfig config = sslContextOpt.isPresent() ? new MqttClientConfig(sslContextOpt.get()) : new MqttClientConfig();
  111 + if (!StringUtils.isEmpty(this.config.getClientId())) {
  112 + config.setClientId(this.config.getClientId());
  113 + }
  114 + this.config.getCredentials().configure(config);
  115 + MqttClient client = MqttClient.create(config);
  116 + client.setEventLoop(this.eventLoopGroup);
  117 + Future<MqttConnectResult> connectFuture = client.connect(this.config.getHost(), this.config.getPort());
  118 + MqttConnectResult result;
  119 + try {
  120 + result = connectFuture.get(this.config.getConnectTimeoutSec(), TimeUnit.SECONDS);
  121 + } catch (TimeoutException ex) {
  122 + connectFuture.cancel(true);
  123 + client.disconnect();
  124 + String hostPort = this.config.getHost() + ":" + this.config.getPort();
  125 + throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort));
  126 + }
  127 + if (!result.isSuccess()) {
  128 + connectFuture.cancel(true);
  129 + client.disconnect();
  130 + String hostPort = this.config.getHost() + ":" + this.config.getPort();
  131 + throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode()));
  132 + }
  133 + return client;
  134 + }
  135 +
  136 + private Optional<SslContext> initSslContext() throws SSLException {
  137 + Optional<SslContext> result = this.config.getCredentials().initSslContext();
  138 + if (this.config.isSsl() && !result.isPresent()) {
  139 + result = Optional.of(SslContextBuilder.forClient().build());
  140 + }
  141 + return result;
  142 + }
  143 +
  144 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt;
  18 +
  19 +import lombok.Data;
  20 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  21 +import org.thingsboard.rule.engine.mqtt.credentials.AnonymousCredentials;
  22 +import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
  23 +
  24 +@Data
  25 +public class TbMqttNodeConfiguration implements NodeConfiguration<TbMqttNodeConfiguration> {
  26 +
  27 + private String topicPattern;
  28 + private String host;
  29 + private int port;
  30 + private int connectTimeoutSec;
  31 + private String clientId;
  32 +
  33 + private boolean ssl;
  34 + private MqttClientCredentials credentials;
  35 +
  36 + @Override
  37 + public TbMqttNodeConfiguration defaultConfiguration() {
  38 + TbMqttNodeConfiguration configuration = new TbMqttNodeConfiguration();
  39 + configuration.setTopicPattern("my-topic");
  40 + configuration.setHost("localhost");
  41 + configuration.setPort(1883);
  42 + configuration.setConnectTimeoutSec(10);
  43 + configuration.setSsl(false);
  44 + configuration.setCredentials(new AnonymousCredentials());
  45 + return configuration;
  46 + }
  47 +
  48 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt.credentials;
  18 +
  19 +import io.netty.handler.ssl.SslContext;
  20 +import nl.jk5.mqtt.MqttClientConfig;
  21 +
  22 +import java.util.Optional;
  23 +
  24 +public class AnonymousCredentials implements MqttClientCredentials {
  25 +
  26 + @Override
  27 + public Optional<SslContext> initSslContext() {
  28 + return Optional.empty();
  29 + }
  30 +
  31 + @Override
  32 + public void configure(MqttClientConfig config) {
  33 +
  34 + }
  35 +}
  36 +
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt.credentials;
  18 +
  19 +import io.netty.handler.ssl.SslContext;
  20 +import lombok.Data;
  21 +import nl.jk5.mqtt.MqttClientConfig;
  22 +
  23 +import java.util.Optional;
  24 +
  25 +@Data
  26 +public class BasicCredentials implements MqttClientCredentials {
  27 +
  28 + private String username;
  29 + private String password;
  30 +
  31 + @Override
  32 + public Optional<SslContext> initSslContext() {
  33 + return Optional.empty();
  34 + }
  35 +
  36 + @Override
  37 + public void configure(MqttClientConfig config) {
  38 + config.setUsername(username);
  39 + config.setPassword(password);
  40 + }
  41 +
  42 +}
  43 +
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt.credentials;
  18 +
  19 +import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  20 +import io.netty.handler.ssl.ClientAuth;
  21 +import io.netty.handler.ssl.SslContext;
  22 +import io.netty.handler.ssl.SslContextBuilder;
  23 +import lombok.Data;
  24 +import lombok.extern.slf4j.Slf4j;
  25 +import nl.jk5.mqtt.MqttClientConfig;
  26 +import org.apache.commons.codec.binary.Base64;
  27 +import org.bouncycastle.jce.provider.BouncyCastleProvider;
  28 +import org.bouncycastle.openssl.PEMDecryptorProvider;
  29 +import org.bouncycastle.openssl.PEMEncryptedKeyPair;
  30 +import org.bouncycastle.openssl.PEMKeyPair;
  31 +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
  32 +import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder;
  33 +import org.springframework.util.StringUtils;
  34 +
  35 +import javax.net.ssl.KeyManagerFactory;
  36 +import javax.net.ssl.TrustManagerFactory;
  37 +import java.io.ByteArrayInputStream;
  38 +import java.security.*;
  39 +import java.security.cert.Certificate;
  40 +import java.security.cert.CertificateFactory;
  41 +import java.security.cert.X509Certificate;
  42 +import java.security.interfaces.RSAPrivateKey;
  43 +import java.security.spec.PKCS8EncodedKeySpec;
  44 +import java.util.Optional;
  45 +
  46 +@Data
  47 +@Slf4j
  48 +@JsonIgnoreProperties(ignoreUnknown = true)
  49 +public class CertPemClientCredentials implements MqttClientCredentials {
  50 +
  51 + private static final String TLS_VERSION = "TLSv1.2";
  52 +
  53 + private String caCert;
  54 + private String cert;
  55 + private String privateKey;
  56 + private String password;
  57 +
  58 + @Override
  59 + public Optional<SslContext> initSslContext() {
  60 + try {
  61 + Security.addProvider(new BouncyCastleProvider());
  62 + return Optional.of(SslContextBuilder.forClient()
  63 + .keyManager(createAndInitKeyManagerFactory())
  64 + .trustManager(createAndInitTrustManagerFactory())
  65 + .clientAuth(ClientAuth.REQUIRE)
  66 + .build());
  67 + } catch (Exception e) {
  68 + log.error("[{}:{}] Creating TLS factory failed!", caCert, cert, e);
  69 + throw new RuntimeException("Creating TLS factory failed!", e);
  70 + }
  71 + }
  72 +
  73 + @Override
  74 + public void configure(MqttClientConfig config) {
  75 +
  76 + }
  77 +
  78 + private KeyManagerFactory createAndInitKeyManagerFactory() throws Exception {
  79 + X509Certificate certHolder = readCertFile(cert);
  80 + Object keyObject = readPrivateKeyFile(privateKey);
  81 + char[] passwordCharArray = "".toCharArray();
  82 + if (!StringUtils.isEmpty(password)) {
  83 + passwordCharArray = password.toCharArray();
  84 + }
  85 +
  86 + JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
  87 +
  88 + PrivateKey privateKey;
  89 + if (keyObject instanceof PEMEncryptedKeyPair) {
  90 + PEMDecryptorProvider provider = new JcePEMDecryptorProviderBuilder().build(passwordCharArray);
  91 + KeyPair key = keyConverter.getKeyPair(((PEMEncryptedKeyPair) keyObject).decryptKeyPair(provider));
  92 + privateKey = key.getPrivate();
  93 + } else if (keyObject instanceof PEMKeyPair) {
  94 + KeyPair key = keyConverter.getKeyPair((PEMKeyPair) keyObject);
  95 + privateKey = key.getPrivate();
  96 + } else if (keyObject instanceof PrivateKey) {
  97 + privateKey = (PrivateKey)keyObject;
  98 + } else {
  99 + throw new RuntimeException("Unable to get private key from object: " + keyObject.getClass());
  100 + }
  101 +
  102 + KeyStore clientKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
  103 + clientKeyStore.load(null, null);
  104 + clientKeyStore.setCertificateEntry("cert", certHolder);
  105 + clientKeyStore.setKeyEntry("private-key",
  106 + privateKey,
  107 + passwordCharArray,
  108 + new Certificate[]{certHolder});
  109 +
  110 + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
  111 + keyManagerFactory.init(clientKeyStore, passwordCharArray);
  112 + return keyManagerFactory;
  113 + }
  114 +
  115 + private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
  116 + X509Certificate caCertHolder;
  117 + caCertHolder = readCertFile(caCert);
  118 +
  119 + KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
  120 + caKeyStore.load(null, null);
  121 + caKeyStore.setCertificateEntry("caCert-cert", caCertHolder);
  122 +
  123 + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
  124 + trustManagerFactory.init(caKeyStore);
  125 + return trustManagerFactory;
  126 + }
  127 +
  128 + private X509Certificate readCertFile(String fileContent) throws Exception {
  129 + X509Certificate certificate = null;
  130 + if (fileContent != null && !fileContent.trim().isEmpty()) {
  131 + fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "")
  132 + .replace("-----END CERTIFICATE-----", "")
  133 + .replaceAll("\\s", "");
  134 + byte[] decoded = Base64.decodeBase64(fileContent);
  135 + CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
  136 + certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded));
  137 + }
  138 + return certificate;
  139 + }
  140 +
  141 + private PrivateKey readPrivateKeyFile(String fileContent) throws Exception {
  142 + RSAPrivateKey privateKey = null;
  143 + if (fileContent != null && !fileContent.isEmpty()) {
  144 + fileContent = fileContent.replaceAll(".*BEGIN.*PRIVATE KEY.*", "")
  145 + .replaceAll(".*END.*PRIVATE KEY.*", "")
  146 + .replaceAll("\\s", "");
  147 + byte[] decoded = Base64.decodeBase64(fileContent);
  148 + KeyFactory keyFactory = KeyFactory.getInstance("RSA");
  149 + privateKey = (RSAPrivateKey) keyFactory.generatePrivate(new PKCS8EncodedKeySpec(decoded));
  150 + }
  151 + return privateKey;
  152 + }
  153 +
  154 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +package org.thingsboard.rule.engine.mqtt.credentials;
  18 +
  19 +import com.fasterxml.jackson.annotation.JsonSubTypes;
  20 +import com.fasterxml.jackson.annotation.JsonTypeInfo;
  21 +import io.netty.handler.ssl.SslContext;
  22 +import nl.jk5.mqtt.MqttClientConfig;
  23 +
  24 +import java.util.Optional;
  25 +
  26 +@JsonTypeInfo(
  27 + use = JsonTypeInfo.Id.NAME,
  28 + include = JsonTypeInfo.As.PROPERTY,
  29 + property = "type")
  30 +@JsonSubTypes({
  31 + @JsonSubTypes.Type(value = AnonymousCredentials.class, name = "anonymous"),
  32 + @JsonSubTypes.Type(value = BasicCredentials.class, name = "basic"),
  33 + @JsonSubTypes.Type(value = CertPemClientCredentials.class, name = "cert.PEM")})
  34 +public interface MqttClientCredentials {
  35 +
  36 + Optional<SslContext> initSslContext();
  37 +
  38 + void configure(MqttClientConfig config);
  39 +}
  40 +
... ...