Commit 43b2eedbd5cc82caa7e5c66ca7125de7d4702b81

Authored by YevhenBondarenko
1 parent 8c413f8b

azure service bus js-executor

... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.azure.servicebus;
18 18 import com.microsoft.azure.servicebus.management.ManagementClient;
19 19 import com.microsoft.azure.servicebus.management.QueueDescription;
20 20 import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
  21 +import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException;
21 22 import com.microsoft.azure.servicebus.primitives.ServiceBusException;
22 23 import lombok.extern.slf4j.Slf4j;
23 24 import org.thingsboard.server.queue.TbQueueAdmin;
... ... @@ -71,7 +72,12 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
71 72 client.createQueue(queueDescription);
72 73 queues.add(topic);
73 74 } catch (ServiceBusException | InterruptedException e) {
74   - log.error("Failed to create queue: [{}]", topic, e);
  75 + if (e instanceof MessagingEntityAlreadyExistsException) {
  76 + queues.add(topic);
  77 + log.info("[{}] queue already exists.", topic);
  78 + } else {
  79 + log.error("Failed to create queue: [{}]", topic, e);
  80 + }
75 81 }
76 82 }
77 83
... ...
... ... @@ -15,7 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
... ... @@ -35,19 +37,20 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat
35 37 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
36 38 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
37 39 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
  40 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
38 41 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
39 42 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
40 43 import org.thingsboard.server.queue.discovery.PartitionService;
41 44 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
42   -import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
43   -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
44 45 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  46 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
45 47 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
46 48 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
47 49 import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
48 50 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
49 51
50 52 import javax.annotation.PreDestroy;
  53 +import java.nio.charset.StandardCharsets;
51 54
52 55 @Component
53 56 @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'")
... ... @@ -60,6 +63,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
60 63 private final TbQueueTransportApiSettings transportApiSettings;
61 64 private final TbQueueTransportNotificationSettings transportNotificationSettings;
62 65 private final TbServiceBusSettings serviceBusSettings;
  66 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
63 67
64 68 private final TbQueueAdmin coreAdmin;
65 69 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -73,6 +77,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
73 77 TbQueueTransportApiSettings transportApiSettings,
74 78 TbQueueTransportNotificationSettings transportNotificationSettings,
75 79 TbServiceBusSettings serviceBusSettings,
  80 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
76 81 TbServiceBusQueueConfigs serviceBusQueueConfigs) {
77 82 this.partitionService = partitionService;
78 83 this.coreSettings = coreSettings;
... ... @@ -81,6 +86,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
81 86 this.transportApiSettings = transportApiSettings;
82 87 this.transportNotificationSettings = transportNotificationSettings;
83 88 this.serviceBusSettings = serviceBusSettings;
  89 + this.jsInvokeSettings = jsInvokeSettings;
84 90
85 91 this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
86 92 this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
... ... @@ -152,8 +158,26 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
152 158 }
153 159
154 160 @Override
  161 + @Bean
155 162 public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
156   - return null;
  163 + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic());
  164 + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings,
  165 + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
  166 + msg -> {
  167 + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
  168 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  169 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  170 + });
  171 +
  172 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  173 + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  174 + builder.queueAdmin(jsExecutorAdmin);
  175 + builder.requestTemplate(producer);
  176 + builder.responseTemplate(consumer);
  177 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  178 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  179 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  180 + return builder.build();
157 181 }
158 182
159 183 @PreDestroy
... ...
... ... @@ -15,7 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
... ... @@ -34,15 +36,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat
34 36 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
35 37 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
36 38 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
  39 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
37 40 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
38 41 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
39 42 import org.thingsboard.server.queue.discovery.PartitionService;
40 43 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
41 44 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  45 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
42 46 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
43 47 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
44 48
45 49 import javax.annotation.PreDestroy;
  50 +import java.nio.charset.StandardCharsets;
46 51
47 52 @Component
48 53 @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'")
... ... @@ -54,6 +59,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
54 59 private final TbQueueTransportApiSettings transportApiSettings;
55 60 private final PartitionService partitionService;
56 61 private final TbServiceInfoProvider serviceInfoProvider;
  62 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
57 63
58 64 private final TbQueueAdmin coreAdmin;
59 65 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -67,6 +73,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
67 73 TbQueueRuleEngineSettings ruleEngineSettings,
68 74 PartitionService partitionService,
69 75 TbServiceInfoProvider serviceInfoProvider,
  76 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
70 77 TbServiceBusQueueConfigs serviceBusQueueConfigs) {
71 78 this.serviceBusSettings = serviceBusSettings;
72 79 this.coreSettings = coreSettings;
... ... @@ -74,6 +81,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
74 81 this.ruleEngineSettings = ruleEngineSettings;
75 82 this.partitionService = partitionService;
76 83 this.serviceInfoProvider = serviceInfoProvider;
  84 + this.jsInvokeSettings = jsInvokeSettings;
77 85
78 86 this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
79 87 this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
... ... @@ -132,8 +140,26 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
132 140 }
133 141
134 142 @Override
  143 + @Bean
135 144 public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
136   - return null;
  145 + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic());
  146 + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings,
  147 + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
  148 + msg -> {
  149 + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
  150 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  151 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  152 + });
  153 +
  154 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  155 + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  156 + builder.queueAdmin(jsExecutorAdmin);
  157 + builder.requestTemplate(producer);
  158 + builder.responseTemplate(consumer);
  159 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  160 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  161 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  162 + return builder.build();
137 163 }
138 164
139 165 @PreDestroy
... ...
... ... @@ -15,7 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
... ... @@ -32,15 +34,18 @@ import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplat
32 34 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate;
33 35 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs;
34 36 import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings;
  37 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
35 38 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
36 39 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 40 import org.thingsboard.server.queue.discovery.PartitionService;
38 41 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
39 42 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  43 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
40 44 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
41 45 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
42 46
43 47 import javax.annotation.PreDestroy;
  48 +import java.nio.charset.StandardCharsets;
44 49
45 50 @Component
46 51 @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'")
... ... @@ -51,6 +56,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
51 56 private final TbServiceInfoProvider serviceInfoProvider;
52 57 private final TbQueueRuleEngineSettings ruleEngineSettings;
53 58 private final TbServiceBusSettings serviceBusSettings;
  59 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
54 60
55 61 private final TbQueueAdmin coreAdmin;
56 62 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -61,12 +67,14 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
61 67 TbQueueRuleEngineSettings ruleEngineSettings,
62 68 TbServiceInfoProvider serviceInfoProvider,
63 69 TbServiceBusSettings serviceBusSettings,
  70 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
64 71 TbServiceBusQueueConfigs serviceBusQueueConfigs) {
65 72 this.partitionService = partitionService;
66 73 this.coreSettings = coreSettings;
67 74 this.serviceInfoProvider = serviceInfoProvider;
68 75 this.ruleEngineSettings = ruleEngineSettings;
69 76 this.serviceBusSettings = serviceBusSettings;
  77 + this.jsInvokeSettings = jsInvokeSettings;
70 78
71 79 this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
72 80 this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
... ... @@ -113,8 +121,26 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
113 121 }
114 122
115 123 @Override
  124 + @Bean
116 125 public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
117   - return null;
  126 + TbQueueProducer<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> producer = new TbServiceBusProducerTemplate<>(jsExecutorAdmin, serviceBusSettings, jsInvokeSettings.getRequestTopic());
  127 + TbQueueConsumer<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> consumer = new TbServiceBusConsumerTemplate<>(jsExecutorAdmin, serviceBusSettings,
  128 + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
  129 + msg -> {
  130 + JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
  131 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  132 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  133 + });
  134 +
  135 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  136 + <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  137 + builder.queueAdmin(jsExecutorAdmin);
  138 + builder.requestTemplate(producer);
  139 + builder.responseTemplate(consumer);
  140 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  141 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  142 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  143 + return builder.build();
118 144 }
119 145
120 146 @PreDestroy
... ...
... ... @@ -38,12 +38,11 @@ function JsInvokeMessageProcessor(producer) {
38 38 this.executedScriptsCounter = 0;
39 39 }
40 40
41   -JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(messageStr) {
  41 +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
42 42
43 43 let requestId;
44 44 let responseTopic;
45 45 try {
46   - let message = JSON.parse(messageStr);
47 46 let request = JSON.parse(Buffer.from(message.data).toString('utf8'));
48 47 let headers = message.headers;
49 48 let buf = Buffer.from(headers.data['requestId']);
... ...
... ... @@ -14,7 +14,7 @@
14 14 # limitations under the License.
15 15 #
16 16
17   -service-type: "TB_SERVICE_TYPE"
  17 +service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
18 18 request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
19 19
20 20 js:
... ... @@ -46,6 +46,13 @@ rabbitmq:
46 46 password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
47 47 queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
48 48
  49 +service_bus:
  50 + namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME"
  51 + sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME"
  52 + sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY"
  53 + max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES"
  54 + queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
  55 +
49 56 logger:
50 57 level: "LOGGER_LEVEL"
51 58 path: "LOG_FOLDER"
... ...
... ... @@ -41,6 +41,8 @@ rabbitmq:
41 41 password: "password"
42 42 queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
43 43
  44 +service_bus:
  45 + queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
44 46
45 47 logger:
46 48 level: "info"
... ...
... ... @@ -18,6 +18,8 @@
18 18 "@google-cloud/pubsub": "^1.7.1",
19 19 "aws-sdk": "^2.663.0",
20 20 "amqplib": "^0.5.5",
  21 + "@azure/service-bus": "^1.1.6",
  22 + "azure-sb": "^0.11.1",
21 23 "long": "^4.0.0",
22 24 "uuid-parse": "^1.0.0",
23 25 "winston": "^3.0.0",
... ...
... ... @@ -113,7 +113,7 @@ function AwsSqsProducer() {
113 113 Id: message.MessageId,
114 114 ReceiptHandle: message.ReceiptHandle
115 115 });
116   - messageProcessor.onJsInvokeMessage(message.Body);
  116 + messageProcessor.onJsInvokeMessage(JSON.parse(message.Body));
117 117 });
118 118
119 119 const deleteBatch = {
... ... @@ -187,7 +187,7 @@ async function exit(status) {
187 187 logger.info('Stopping Aws Sqs client.')
188 188 try {
189 189 await sqsClient.close();
190   - logger.info('Aws Sqs client is stopped.')
  190 + logger.info('Aws Sqs client stopped.')
191 191 process.exit(status);
192 192 } catch (e) {
193 193 logger.info('Aws Sqs client stop error.');
... ...
... ... @@ -105,7 +105,7 @@ function KafkaProducer() {
105 105 msg.key = key.toString('utf8');
106 106 msg.data = [...data];
107 107 msg.headers = {data: headers}
108   - messageProcessor.onJsInvokeMessage(JSON.stringify(msg));
  108 + messageProcessor.onJsInvokeMessage(msg);
109 109 },
110 110 });
111 111
... ...
... ... @@ -83,7 +83,7 @@ function PubSubProducer() {
83 83
84 84 const messageHandler = message => {
85 85
86   - messageProcessor.onJsInvokeMessage(message.data.toString('utf8'));
  86 + messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
87 87 message.ack();
88 88 };
89 89
... ... @@ -141,7 +141,7 @@ async function exit(status) {
141 141 logger.info('Stopping Pub/Sub client.')
142 142 try {
143 143 await pubSubClient.close();
144   - logger.info('Pub/Sub client is stopped.')
  144 + logger.info('Pub/Sub client stopped.')
145 145 process.exit(status);
146 146 } catch (e) {
147 147 logger.info('Pub/Sub client stop error.');
... ...
... ... @@ -109,7 +109,7 @@ function RabbitMqProducer() {
109 109 });
110 110
111 111 if (message) {
112   - messageProcessor.onJsInvokeMessage(message.content.toString('utf8'));
  112 + messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
113 113 channel.ack(message);
114 114 } else {
115 115 await sleep(poolInterval);
... ... @@ -132,7 +132,7 @@ function parseQueueProperties() {
132 132
133 133 function createQueue(topic) {
134 134 return new Promise((resolve, reject) => {
135   - channel.assertQueue(topic, queueParams, function (err, data) {
  135 + channel.assertQueue(topic, queueParams, function (err) {
136 136 if (err) {
137 137 reject(err);
138 138 } else {
... ... @@ -158,14 +158,14 @@ async function exit(status) {
158 158 if (channel) {
159 159 logger.info('Stopping RabbitMq chanel.')
160 160 await channel.close();
161   - logger.info('RabbitMq chanel is stopped');
  161 + logger.info('RabbitMq chanel stopped');
162 162 }
163 163
164 164 if (connection) {
165 165 logger.info('Stopping RabbitMq connection.')
166 166 try {
167 167 await connection.close();
168   - logger.info('RabbitMq client is connection.')
  168 + logger.info('RabbitMq client connection.')
169 169 process.exit(status);
170 170 } catch (e) {
171 171 logger.info('RabbitMq connection stop error.');
... ...
  1 +/*
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +'use strict';
  18 +
  19 +const config = require('config'),
  20 + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
  21 + logger = require('../config/logger')._logger('serviceBusTemplate');
  22 +const {ServiceBusClient, ReceiveMode} = require("@azure/service-bus");
  23 +const azure = require('azure-sb');
  24 +
  25 +const requestTopic = config.get('request_topic');
  26 +const namespaceName = config.get('service_bus.namespace_name');
  27 +const sasKeyName = config.get('service_bus.sas_key_name');
  28 +const sasKey = config.get('service_bus.sas_key');
  29 +const queueProperties = config.get('service_bus.queue-properties');
  30 +
  31 +let sbClient;
  32 +let receiverClient;
  33 +let receiver;
  34 +let serviceBusService;
  35 +
  36 +let queueOptions = {};
  37 +const queues = [];
  38 +const senderMap = new Map();
  39 +
  40 +function ServiceBusProducer() {
  41 + this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  42 + if (!queues.includes(requestTopic)) {
  43 + await createQueueIfNotExist(requestTopic);
  44 + queues.push(requestTopic);
  45 + }
  46 +
  47 + let customSender = senderMap.get(responseTopic);
  48 +
  49 + if (!customSender) {
  50 + customSender = new CustomSender(responseTopic);
  51 + senderMap.set(responseTopic, customSender);
  52 + }
  53 +
  54 + let data = {
  55 + key: scriptId,
  56 + data: [...rawResponse],
  57 + headers: headers
  58 + };
  59 +
  60 + return customSender.send({body: data});
  61 + }
  62 +}
  63 +
  64 +function CustomSender(topic) {
  65 + this.queueClient = sbClient.createQueueClient(topic);
  66 + this.sender = this.queueClient.createSender();
  67 +
  68 + this.send = async (message) => {
  69 + return this.sender.send(message);
  70 + }
  71 +}
  72 +
  73 +(async () => {
  74 + try {
  75 + logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
  76 +
  77 + const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`;
  78 + sbClient = ServiceBusClient.createFromConnectionString(connectionString);
  79 + serviceBusService = azure.createServiceBusService(connectionString);
  80 +
  81 + parseQueueProperties();
  82 +
  83 + await new Promise((resolve, reject) => {
  84 + serviceBusService.listQueues((err, data) => {
  85 + if (err) {
  86 + reject(err);
  87 + } else {
  88 + data.forEach(queue => {
  89 + queues.push(queue.QueueName);
  90 + });
  91 + resolve();
  92 + }
  93 + });
  94 + });
  95 +
  96 + if (!queues.includes(requestTopic)) {
  97 + await createQueueIfNotExist(requestTopic);
  98 + queues.push(requestTopic);
  99 + }
  100 +
  101 + receiverClient = sbClient.createQueueClient(requestTopic);
  102 + receiver = receiverClient.createReceiver(ReceiveMode.peekLock);
  103 +
  104 + const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer());
  105 +
  106 + const messageHandler = async (message) => {
  107 + if (message) {
  108 + messageProcessor.onJsInvokeMessage(message.body);
  109 + await message.complete();
  110 + }
  111 + };
  112 + const errorHandler = (error) => {
  113 + logger.error('Failed to receive message from queue.', error);
  114 + };
  115 + receiver.registerMessageHandler(messageHandler, errorHandler);
  116 + } catch (e) {
  117 + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
  118 + logger.error(e.stack);
  119 + exit(-1);
  120 + }
  121 +})();
  122 +
  123 +async function createQueueIfNotExist(topic) {
  124 + return new Promise((resolve, reject) => {
  125 + serviceBusService.createQueueIfNotExists(topic, queueOptions, (err) => {
  126 + if (err) {
  127 + reject(err);
  128 + } else {
  129 + resolve();
  130 + }
  131 + });
  132 + });
  133 +}
  134 +
  135 +function parseQueueProperties() {
  136 + let properties = {};
  137 + const props = queueProperties.split(';');
  138 + props.forEach(p => {
  139 + const delimiterPosition = p.indexOf(':');
  140 + properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  141 + });
  142 + queueOptions = {
  143 + MaxSizeInMegabytes: properties['maxSizeInMb'],
  144 + DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
  145 + LockDuration: `PT${properties['lockDurationInSec']}S`
  146 + };
  147 +}
  148 +
  149 +process.on('exit', () => {
  150 + exit(0);
  151 +});
  152 +
  153 +async function exit(status) {
  154 + logger.info('Exiting with status: %d ...', status);
  155 + logger.info('Stopping Azure Service Bus resources...')
  156 + if (receiver) {
  157 + try {
  158 + await receiver.close();
  159 + } catch (e) {
  160 +
  161 + }
  162 + }
  163 +
  164 + if (receiverClient) {
  165 + try {
  166 + await receiverClient.close();
  167 + } catch (e) {
  168 +
  169 + }
  170 + }
  171 +
  172 + senderMap.forEach((k, v) => {
  173 + try {
  174 + v.sender.close();
  175 + } catch (e) {
  176 +
  177 + }
  178 + try {
  179 + v.queueClient.close();
  180 + } catch (e) {
  181 +
  182 + }
  183 + });
  184 +
  185 + if (sbClient) {
  186 + try {
  187 + sbClient.close();
  188 + } catch (e) {
  189 +
  190 + }
  191 + }
  192 + logger.info('Azure Service Bus resources stopped.')
  193 + process.exit(status);
  194 +}
\ No newline at end of file
... ...
... ... @@ -21,22 +21,27 @@ switch (serviceType) {
21 21 case 'kafka':
22 22 logger.info('Starting kafka template.');
23 23 require('./queue/kafkaTemplate');
24   - logger.info('kafka template is started.');
  24 + logger.info('kafka template started.');
25 25 break;
26 26 case 'pubsub':
27 27 logger.info('Starting Pub/Sub template.')
28 28 require('./queue/pubSubTemplate');
29   - logger.info('Pub/Sub template is started.')
  29 + logger.info('Pub/Sub template started.')
30 30 break;
31 31 case 'aws-sqs':
32 32 logger.info('Starting Aws Sqs template.')
33 33 require('./queue/awsSqsTemplate');
34   - logger.info('Aws Sqs template is started.')
  34 + logger.info('Aws Sqs template started.')
35 35 break;
36 36 case 'rabbitmq':
37 37 logger.info('Starting RabbitMq template.')
38 38 require('./queue/rabbitmqTemplate');
39   - logger.info('RabbitMq template is started.')
  39 + logger.info('RabbitMq template started.')
  40 + break;
  41 + case 'service-bus':
  42 + logger.info('Starting Azure Service Bus template.')
  43 + require('./queue/serviceBusTemplate');
  44 + logger.info('Azure Service Bus template started.')
40 45 break;
41 46 default:
42 47 logger.error('Unknown service type: ', serviceType);
... ...
... ... @@ -41,7 +41,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
41 41 name = "delay",
42 42 configClazz = TbMsgDelayNodeConfiguration.class,
43 43 nodeDescription = "Delays incoming message",
44   - nodeDetails = "Delays messages for configurable period.",
  44 + nodeDetails = "Delays messages for configurable period. Please note, this node acknowledges the message from the current queue (message will be removed from queue)",
45 45 icon = "pause",
46 46 uiResources = {"static/rulenode/rulenode-core-config.js"},
47 47 configDirective = "tbActionNodeMsgDelayConfig"
... ...