Commit 0dc31fbdde07dada1e79e8d7ce1c57cab97324b9

Authored by YevhenBondarenko
1 parent 4fb309c3

created awsSqs, pubSub, rabbitmq js-executors

... ... @@ -15,20 +15,26 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21   -import org.thingsboard.server.gen.js.JsInvokeProtos;
  23 +import org.thingsboard.server.gen.js.JsInvokeProtos.*;
22 24 import org.thingsboard.server.gen.transport.TransportProtos;
23 25 import org.thingsboard.server.queue.TbQueueAdmin;
24 26 import org.thingsboard.server.queue.TbQueueConsumer;
25 27 import org.thingsboard.server.queue.TbQueueProducer;
26 28 import org.thingsboard.server.queue.TbQueueRequestTemplate;
  29 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
27 30 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
28 31 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 32 import org.thingsboard.server.queue.discovery.PartitionService;
30 33 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  34 +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
  35 +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
31 36 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  37 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
32 38 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
33 39 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
34 40 import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
... ... @@ -40,6 +46,7 @@ import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes;
40 46 import org.thingsboard.server.queue.sqs.TbAwsSqsSettings;
41 47
42 48 import javax.annotation.PreDestroy;
  49 +import java.nio.charset.StandardCharsets;
43 50
44 51 @Component
45 52 @ConditionalOnExpression("'${queue.type:null}'=='aws-sqs' && '${service.type:null}'=='monolith'")
... ... @@ -52,6 +59,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
52 59 private final TbQueueTransportApiSettings transportApiSettings;
53 60 private final TbQueueTransportNotificationSettings transportNotificationSettings;
54 61 private final TbAwsSqsSettings sqsSettings;
  62 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
55 63
56 64 private final TbQueueAdmin coreAdmin;
57 65 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -65,7 +73,8 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
65 73 TbQueueTransportApiSettings transportApiSettings,
66 74 TbQueueTransportNotificationSettings transportNotificationSettings,
67 75 TbAwsSqsSettings sqsSettings,
68   - TbAwsSqsQueueAttributes sqsQueueAttributes) {
  76 + TbAwsSqsQueueAttributes sqsQueueAttributes,
  77 + TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
69 78 this.partitionService = partitionService;
70 79 this.coreSettings = coreSettings;
71 80 this.serviceInfoProvider = serviceInfoProvider;
... ... @@ -73,6 +82,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
73 82 this.transportApiSettings = transportApiSettings;
74 83 this.transportNotificationSettings = transportNotificationSettings;
75 84 this.sqsSettings = sqsSettings;
  85 + this.jsInvokeSettings = jsInvokeSettings;
76 86
77 87 this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
78 88 this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
... ... @@ -144,8 +154,26 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
144 154 }
145 155
146 156 @Override
147   - public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
148   - return null;
  157 + @Bean
  158 + public TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> createRemoteJsRequestTemplate() {
  159 + TbQueueProducer<TbProtoJsQueueMsg<RemoteJsRequest>> producer = new TbAwsSqsProducerTemplate<>(jsExecutorAdmin, sqsSettings, jsInvokeSettings.getRequestTopic());
  160 + TbQueueConsumer<TbProtoQueueMsg<RemoteJsResponse>> consumer = new TbAwsSqsConsumerTemplate<>(jsExecutorAdmin, sqsSettings,
  161 + jsInvokeSettings.getResponseTopic() + "_" + serviceInfoProvider.getServiceId(),
  162 + msg -> {
  163 + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder();
  164 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  165 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  166 + });
  167 +
  168 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  169 + <TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  170 + builder.queueAdmin(jsExecutorAdmin);
  171 + builder.requestTemplate(producer);
  172 + builder.responseTemplate(consumer);
  173 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  174 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  175 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  176 + return builder.build();
149 177 }
150 178
151 179 @PreDestroy
... ...
... ... @@ -15,10 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21   -import org.thingsboard.server.gen.js.JsInvokeProtos;
  23 +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
  24 +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
22 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
23 26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
24 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
... ... @@ -30,6 +33,7 @@ import org.thingsboard.server.queue.TbQueueAdmin;
30 33 import org.thingsboard.server.queue.TbQueueConsumer;
31 34 import org.thingsboard.server.queue.TbQueueProducer;
32 35 import org.thingsboard.server.queue.TbQueueRequestTemplate;
  36 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
33 37 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
34 38 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
35 39 import org.thingsboard.server.queue.discovery.PartitionService;
... ... @@ -39,11 +43,14 @@ import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
39 43 import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
40 44 import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
41 45 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  46 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
42 47 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
43 48 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
44 49 import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
45 50 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
46 51
  52 +import java.nio.charset.StandardCharsets;
  53 +
47 54 @Component
48 55 @ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'")
49 56 public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
... ... @@ -56,6 +63,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
56 63 private final TbQueueAdmin admin;
57 64 private final PartitionService partitionService;
58 65 private final TbServiceInfoProvider serviceInfoProvider;
  66 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
59 67
60 68 public PubSubMonolithQueueFactory(TbPubSubSettings pubSubSettings,
61 69 TbQueueCoreSettings coreSettings,
... ... @@ -63,7 +71,8 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
63 71 TbQueueTransportApiSettings transportApiSettings,
64 72 TbQueueTransportNotificationSettings transportNotificationSettings,
65 73 PartitionService partitionService,
66   - TbServiceInfoProvider serviceInfoProvider) {
  74 + TbServiceInfoProvider serviceInfoProvider,
  75 + TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
67 76 this.pubSubSettings = pubSubSettings;
68 77 this.coreSettings = coreSettings;
69 78 this.ruleEngineSettings = ruleEngineSettings;
... ... @@ -72,6 +81,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
72 81 this.admin = new TbPubSubAdmin(pubSubSettings);
73 82 this.partitionService = partitionService;
74 83 this.serviceInfoProvider = serviceInfoProvider;
  84 + this.jsInvokeSettings = jsInvokeSettings;
75 85 }
76 86
77 87 @Override
... ... @@ -138,7 +148,25 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
138 148 }
139 149
140 150 @Override
141   - public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
142   - return null;
  151 + @Bean
  152 + public TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> createRemoteJsRequestTemplate() {
  153 + TbQueueProducer<TbProtoJsQueueMsg<RemoteJsRequest>> producer = new TbPubSubProducerTemplate<>(admin, pubSubSettings, jsInvokeSettings.getRequestTopic());
  154 + TbQueueConsumer<TbProtoQueueMsg<RemoteJsResponse>> consumer = new TbPubSubConsumerTemplate<>(admin, pubSubSettings,
  155 + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
  156 + msg -> {
  157 + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder();
  158 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  159 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  160 + });
  161 +
  162 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  163 + <TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  164 + builder.queueAdmin(admin);
  165 + builder.requestTemplate(producer);
  166 + builder.responseTemplate(consumer);
  167 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  168 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  169 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  170 + return builder.build();
143 171 }
144 172 }
... ...
... ... @@ -15,28 +15,37 @@
15 15 */
16 16 package org.thingsboard.server.queue.provider;
17 17
  18 +import com.google.protobuf.util.JsonFormat;
18 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.context.annotation.Bean;
19 21 import org.springframework.stereotype.Component;
20 22 import org.thingsboard.server.common.msg.queue.ServiceType;
21   -import org.thingsboard.server.gen.js.JsInvokeProtos;
  23 +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
  24 +import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
22 25 import org.thingsboard.server.gen.transport.TransportProtos;
23 26 import org.thingsboard.server.queue.TbQueueAdmin;
24 27 import org.thingsboard.server.queue.TbQueueConsumer;
25 28 import org.thingsboard.server.queue.TbQueueProducer;
26 29 import org.thingsboard.server.queue.TbQueueRequestTemplate;
  30 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
27 31 import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
28 32 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 33 import org.thingsboard.server.queue.discovery.PartitionService;
30 34 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  35 +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate;
  36 +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate;
31 37 import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
32 38 import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
33 39 import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
34 40 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  41 +import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
35 42 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
36 43 import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
37 44 import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
38 45 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
39 46
  47 +import java.nio.charset.StandardCharsets;
  48 +
40 49 @Component
41 50 @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'")
42 51 public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
... ... @@ -48,6 +57,8 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
48 57 private final TbQueueTransportApiSettings transportApiSettings;
49 58 private final TbQueueTransportNotificationSettings transportNotificationSettings;
50 59 private final TbRabbitMqSettings rabbitMqSettings;
  60 + private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  61 +
51 62 private final TbQueueAdmin admin;
52 63
53 64 public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
... ... @@ -56,6 +67,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
56 67 TbQueueTransportApiSettings transportApiSettings,
57 68 TbQueueTransportNotificationSettings transportNotificationSettings,
58 69 TbRabbitMqSettings rabbitMqSettings,
  70 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
59 71 TbQueueAdmin admin) {
60 72 this.partitionService = partitionService;
61 73 this.coreSettings = coreSettings;
... ... @@ -64,6 +76,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
64 76 this.transportApiSettings = transportApiSettings;
65 77 this.transportNotificationSettings = transportNotificationSettings;
66 78 this.rabbitMqSettings = rabbitMqSettings;
  79 + this.jsInvokeSettings = jsInvokeSettings;
67 80 this.admin = admin;
68 81 }
69 82
... ... @@ -130,7 +143,25 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
130 143 }
131 144
132 145 @Override
133   - public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
134   - return null;
  146 + @Bean
  147 + public TbQueueRequestTemplate<TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> createRemoteJsRequestTemplate() {
  148 + TbQueueProducer<TbProtoJsQueueMsg<RemoteJsRequest>> producer = new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, jsInvokeSettings.getRequestTopic());
  149 + TbQueueConsumer<TbProtoQueueMsg<RemoteJsResponse>> consumer = new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  150 + jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId(),
  151 + msg -> {
  152 + RemoteJsResponse.Builder builder = RemoteJsResponse.newBuilder();
  153 + JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
  154 + return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
  155 + });
  156 +
  157 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  158 + <TbProtoJsQueueMsg<RemoteJsRequest>, TbProtoQueueMsg<RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
  159 + builder.queueAdmin(admin);
  160 + builder.requestTemplate(producer);
  161 + builder.responseTemplate(consumer);
  162 + builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
  163 + builder.maxRequestTimeout(jsInvokeSettings.getMaxRequestsTimeout());
  164 + builder.pollInterval(jsInvokeSettings.getResponsePollInterval());
  165 + return builder.build();
135 166 }
136 167 }
... ...
... ... @@ -19,7 +19,6 @@ const COMPILATION_ERROR = 0;
19 19 const RUNTIME_ERROR = 1;
20 20 const TIMEOUT_ERROR = 2;
21 21 const UNRECOGNIZED = -1;
22   -let headers;
23 22
24 23 const config = require('config'),
25 24 logger = require('../config/logger')._logger('JsInvokeMessageProcessor'),
... ... @@ -31,7 +30,7 @@ const useSandbox = config.get('script.use_sandbox') === 'true';
31 30 const maxActiveScripts = Number(config.get('script.max_active_scripts'));
32 31
33 32 function JsInvokeMessageProcessor(producer) {
34   - console.log("Kafka Producer:", producer);
  33 + console.log("Producer:", producer);
35 34 this.producer = producer;
36 35 this.executor = new JsExecutor(useSandbox);
37 36 this.scriptMap = {};
... ... @@ -39,26 +38,27 @@ function JsInvokeMessageProcessor(producer) {
39 38 this.executedScriptsCounter = 0;
40 39 }
41 40
42   -JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
  41 +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(messageStr) {
43 42
44   - var requestId;
45   - var responseTopic;
  43 + let requestId;
  44 + let responseTopic;
46 45 try {
47   - var request = JSON.parse(message.value.toString('utf8'));
48   - headers = message.headers;
49   - var buf = message.headers['requestId'];
  46 + let message = JSON.parse(messageStr);
  47 + let request = JSON.parse(Buffer.from(message.data).toString('utf8'));
  48 + let headers = message.headers;
  49 + let buf = Buffer.from(headers.data['requestId']);
50 50 requestId = Utils.UUIDFromBuffer(buf);
51   - buf = message.headers['responseTopic'];
  51 + buf = Buffer.from(headers.data['responseTopic']);
52 52 responseTopic = buf.toString('utf8');
53 53
54 54 logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic);
55 55
56 56 if (request.compileRequest) {
57   - this.processCompileRequest(requestId, responseTopic, request.compileRequest);
  57 + this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest);
58 58 } else if (request.invokeRequest) {
59   - this.processInvokeRequest(requestId, responseTopic, request.invokeRequest);
  59 + this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest);
60 60 } else if (request.releaseRequest) {
61   - this.processReleaseRequest(requestId, responseTopic, request.releaseRequest);
  61 + this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest);
62 62 } else {
63 63 logger.error('[%s] Unknown request recevied!', requestId);
64 64 }
... ... @@ -69,7 +69,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
69 69 }
70 70 }
71 71
72   -JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) {
  72 +JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, headers, compileRequest) {
73 73 var scriptId = getScriptId(compileRequest);
74 74 logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
75 75
... ... @@ -78,17 +78,17 @@ JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, r
78 78 this.cacheScript(scriptId, script);
79 79 var compileResponse = createCompileResponse(scriptId, true);
80 80 logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId);
81   - this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
  81 + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
82 82 },
83 83 (err) => {
84 84 var compileResponse = createCompileResponse(scriptId, false, COMPILATION_ERROR, err);
85 85 logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId);
86   - this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
  86 + this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse);
87 87 }
88 88 );
89 89 }
90 90
91   -JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) {
  91 +JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, headers, invokeRequest) {
92 92 var scriptId = getScriptId(invokeRequest);
93 93 logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
94 94 this.executedScriptsCounter++;
... ... @@ -104,7 +104,7 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re
104 104 (result) => {
105 105 var invokeResponse = createInvokeResponse(result, true);
106 106 logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
107   - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  107 + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
108 108 },
109 109 (err) => {
110 110 var errorCode;
... ... @@ -115,19 +115,19 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, re
115 115 }
116 116 var invokeResponse = createInvokeResponse("", false, errorCode, err);
117 117 logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
118   - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  118 + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
119 119 }
120 120 )
121 121 },
122 122 (err) => {
123 123 var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err);
124 124 logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR);
125   - this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  125 + this.sendResponse(requestId, responseTopic, headers, scriptId, null, invokeResponse);
126 126 }
127 127 );
128 128 }
129 129
130   -JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) {
  130 +JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, headers, releaseRequest) {
131 131 var scriptId = getScriptId(releaseRequest);
132 132 logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
133 133 if (this.scriptMap[scriptId]) {
... ... @@ -139,28 +139,17 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, r
139 139 }
140 140 var releaseResponse = createReleaseResponse(scriptId, true);
141 141 logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId);
142   - this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse);
  142 + this.sendResponse(requestId, responseTopic, headers, scriptId, null, null, releaseResponse);
143 143 }
144 144
145   -JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) {
  145 +JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) {
146 146 var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
147 147 var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
148   - this.producer.send(responseTopic, scriptId, rawResponse, headers
149   - // {
150   - // topic: responseTopic,
151   - // messages: [
152   - // {
153   - // key: scriptId,
154   - // value: rawResponse,
155   - // headers: headers
156   - // }
157   - // ]
158   - // }
159   - ).then(
  148 + this.producer.send(responseTopic, scriptId, rawResponse, headers).then(
160 149 () => {},
161 150 (err) => {
162 151 if (err) {
163   - logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message);
  152 + logger.error('[%s] Failed to send response to queue: %s', requestId, err.message);
164 153 logger.error(err.stack);
165 154 }
166 155 }
... ...
... ... @@ -15,12 +15,33 @@
15 15 #
16 16
17 17 service-type: "TB_SERVICE_TYPE"
  18 +request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
18 19
19 20 kafka:
20   - request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
21 21 bootstrap:
22 22 # Kafka Bootstrap Servers
23 23 servers: "TB_KAFKA_SERVERS"
  24 +
  25 +pubsub:
  26 + project_id: "TB_QUEUE_PUBSUB_PROJECT_ID"
  27 + service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT"
  28 +
  29 +aws_sqs:
  30 + access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID"
  31 + secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY"
  32 + region: "TB_QUEUE_AWS_SQS_REGION"
  33 +
  34 +rabbitmq:
  35 + exchange_name: "TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME"
  36 + host: "TB_QUEUE_RABBIT_MQ_HOST"
  37 + port: "TB_QUEUE_RABBIT_MQ_PORT"
  38 + virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST"
  39 + username: "TB_QUEUE_RABBIT_MQ_USERNAME"
  40 + password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
  41 + automatic_recovery_enabled: "TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED"
  42 + connection_timeout: "TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT"
  43 + handshake_timeout: "TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT"
  44 +
24 45 logger:
25 46 level: "LOGGER_LEVEL"
26 47 path: "LOG_FOLDER"
... ...
... ... @@ -15,13 +15,25 @@
15 15 #
16 16
17 17 service-type: "kafka"
  18 +request_topic: "js.eval.requests"
18 19
19 20 kafka:
20   - request_topic: "js.eval.requests"
21 21 bootstrap:
22 22 # Kafka Bootstrap Servers
23 23 servers: "localhost:9092"
24 24
  25 +rabbitmq:
  26 + exchange_name: ""
  27 + host: "localhost"
  28 + port: "5672"
  29 + virtual_host: "/"
  30 + username: "YOUR_USERNAME"
  31 + password: "YOUR_PASSWORD"
  32 + automatic_recovery_enabled: "false"
  33 + connection_timeout: "60000"
  34 + handshake_timeout: "10000"
  35 +
  36 +
25 37 logger:
26 38 level: "info"
27 39 path: "logs"
... ...
... ... @@ -15,6 +15,9 @@
15 15 "config": "^3.2.2",
16 16 "js-yaml": "^3.12.0",
17 17 "kafkajs": "^1.11.0",
  18 + "@google-cloud/pubsub": "^1.7.1",
  19 + "aws-sdk": "^2.663.0",
  20 + "amqplib": "^0.5.5",
18 21 "long": "^4.0.0",
19 22 "uuid-parse": "^1.0.0",
20 23 "winston": "^3.0.0",
... ...
  1 +/*
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +'use strict';
  18 +
  19 +const config = require('config'),
  20 + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
  21 + logger = require('../config/logger')._logger('awsSqsTemplate');
  22 +
  23 +const requestTopic = config.get('request_topic');
  24 +
  25 +const accessKeyId = config.get('aws_sqs.access_key_id');
  26 +const secretAccessKey = config.get('aws_sqs.secret_access_key');
  27 +const region = config.get('aws_sqs.region');
  28 +const AWS = require('aws-sdk');
  29 +
  30 +let sqsClient;
  31 +let queueURL;
  32 +let responseTopics = new Map();
  33 +let stopped = false;
  34 +
  35 +function AwsSqsProducer() {
  36 + this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  37 + let msgBody = JSON.stringify(
  38 + {
  39 + key: scriptId,
  40 + data: [...rawResponse],
  41 + headers: headers
  42 + });
  43 +
  44 + let responseQueueUrl = responseTopics.get(responseTopic);
  45 +
  46 + if (!responseQueueUrl) {
  47 + responseQueueUrl = await createQueue(responseTopic);
  48 + responseTopics.set(responseTopic, responseQueueUrl);
  49 + }
  50 +
  51 + let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId};
  52 +
  53 + return new Promise((resolve, reject) => {
  54 + sqsClient.sendMessage(params, function (err, data) {
  55 + if (err) {
  56 + reject(err);
  57 + } else {
  58 + resolve(data);
  59 + }
  60 + });
  61 + });
  62 + }
  63 +}
  64 +
  65 +(async () => {
  66 + try {
  67 + logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
  68 + AWS.config.update({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, region: region});
  69 +
  70 + sqsClient = new AWS.SQS({apiVersion: '2012-11-05'});
  71 +
  72 + queueURL = await createQueue(requestTopic);
  73 + const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer());
  74 +
  75 + const params = {
  76 + MaxNumberOfMessages: 10,
  77 + QueueUrl: queueURL,
  78 + WaitTimeSeconds: 0.025
  79 + };
  80 + while (!stopped) {
  81 + const messages = await new Promise((resolve, reject) => {
  82 + sqsClient.receiveMessage(params, function (err, data) {
  83 + if (err) {
  84 + reject(err);
  85 + } else {
  86 + resolve(data.Messages);
  87 + }
  88 + });
  89 + });
  90 +
  91 + if (messages && messages.length > 0) {
  92 + const entries = [];
  93 +
  94 + messages.forEach(message => {
  95 + entries.push({
  96 + Id: message.MessageId,
  97 + ReceiptHandle: message.ReceiptHandle
  98 + });
  99 + messageProcessor.onJsInvokeMessage(message.Body);
  100 + });
  101 +
  102 + const deleteBatch = {
  103 + QueueUrl: queueURL,
  104 + Entries: entries
  105 + };
  106 + sqsClient.deleteMessageBatch(deleteBatch, function (err, data) {
  107 + if (err) {
  108 + logger.error("Failed to delete messages from queue.", err.message);
  109 + } else {
  110 + //do nothing
  111 + }
  112 + });
  113 + }
  114 + }
  115 + } catch (e) {
  116 + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
  117 + logger.error(e.stack);
  118 + exit(-1);
  119 + }
  120 +})();
  121 +
  122 +function createQueue(topic) {
  123 + let queueName = topic.replace(/\./g, '_') + '.fifo';
  124 + let queueParams = {
  125 + QueueName: queueName, Attributes: {
  126 + FifoQueue: 'true',
  127 + ContentBasedDeduplication: 'true'
  128 +
  129 + }
  130 + };
  131 + return new Promise((resolve, reject) => {
  132 + sqsClient.createQueue(queueParams, function (err, data) {
  133 + if (err) {
  134 + reject(err);
  135 + } else {
  136 + resolve(data.QueueUrl);
  137 + }
  138 + });
  139 + });
  140 +}
  141 +
  142 +process.on('exit', () => {
  143 + stopped = true;
  144 + logger.info('Aws Sqs client stopped.');
  145 + exit(0);
  146 +});
  147 +
  148 +async function exit(status) {
  149 + logger.info('Exiting with status: %d ...', status);
  150 + if (sqsClient) {
  151 + logger.info('Stopping Aws Sqs client.')
  152 + try {
  153 + await sqsClient.close();
  154 + logger.info('Aws Sqs client is stopped.')
  155 + process.exit(status);
  156 + } catch (e) {
  157 + logger.info('Aws Sqs client stop error.');
  158 + process.exit(status);
  159 + }
  160 + } else {
  161 + process.exit(status);
  162 + }
  163 +}
... ...
msa/js-executor/queue/kafkaTemplate.js renamed from msa/js-executor/queue/kafka/kafkaTemplate.js
... ... @@ -13,39 +13,41 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -const { logLevel, Kafka } = require('kafkajs');
  16 +const {logLevel, Kafka} = require('kafkajs');
17 17
18 18 const config = require('config'),
19   - JsInvokeMessageProcessor = require('../../api/jsInvokeMessageProcessor'),
20   - logger = require('../../config/logger')._logger('main'),
21   - KafkaJsWinstonLogCreator = require('../../config/logger').KafkaJsWinstonLogCreator;
  19 + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
  20 + logger = require('../config/logger')._logger('kafkaTemplate'),
  21 + KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
22 22
23   -var kafkaClient;
24   -var consumer;
25   -var producer;
  23 +let kafkaClient;
  24 +let consumer;
  25 +let producer;
26 26
27 27 function KafkaProducer() {
28 28 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  29 + let headersData = headers.data;
  30 + headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)]));
29 31 return producer.send(
30   - {
31   - topic: responseTopic,
32   - messages: [
33   - {
34   - key: scriptId,
35   - value: rawResponse,
36   - headers: headers
37   - }
38   - ]
39   - });
  32 + {
  33 + topic: responseTopic,
  34 + messages: [
  35 + {
  36 + key: scriptId,
  37 + value: rawResponse,
  38 + headers: headersData
  39 + }
  40 + ]
  41 + });
40 42 }
41 43 }
42 44
43   -(async() => {
  45 +(async () => {
44 46 try {
45 47 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
46 48
47 49 const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
48   - const kafkaRequestTopic = config.get('kafka.request_topic');
  50 + const kafkaRequestTopic = config.get('request_topic');
49 51
50 52 logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
51 53 logger.info('Kafka Requests Topic: %s', kafkaRequestTopic);
... ... @@ -56,17 +58,28 @@ function KafkaProducer() {
56 58 logCreator: KafkaJsWinstonLogCreator
57 59 });
58 60
59   - consumer = kafkaClient.consumer({ groupId: 'js-executor-group' });
  61 + consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
60 62 producer = kafkaClient.producer();
61 63 const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
62 64 await consumer.connect();
63 65 await producer.connect();
64   - await consumer.subscribe({ topic: kafkaRequestTopic});
  66 + await consumer.subscribe({topic: kafkaRequestTopic});
65 67
66 68 logger.info('Started ThingsBoard JavaScript Executor Microservice.');
67 69 await consumer.run({
68   - eachMessage: async ({ topic, partition, message }) => {
69   - messageProcessor.onJsInvokeMessage(message);
  70 + eachMessage: async ({topic, partition, message}) => {
  71 + let headers = message.headers;
  72 + let key = message.key;
  73 + let data = message.value;
  74 + let msg = {};
  75 +
  76 + headers = Object.fromEntries(
  77 + Object.entries(headers).map(([key, value]) => [key, [...value]]));
  78 +
  79 + msg.key = key.toString('utf8');
  80 + msg.data = [...data];
  81 + msg.headers = {data: headers}
  82 + messageProcessor.onJsInvokeMessage(JSON.stringify(msg));
70 83 },
71 84 });
72 85
... ... @@ -85,7 +98,7 @@ async function exit(status) {
85 98 logger.info('Exiting with status: %d ...', status);
86 99 if (consumer) {
87 100 logger.info('Stopping Kafka Consumer...');
88   - var _consumer = consumer;
  101 + let _consumer = consumer;
89 102 consumer = null;
90 103 try {
91 104 await _consumer.disconnect();
... ...
  1 +/*
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +'use strict';
  18 +
  19 +const config = require('config'),
  20 + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
  21 + logger = require('../config/logger')._logger('pubSubTemplate');
  22 +const {PubSub} = require('@google-cloud/pubsub');
  23 +
  24 +const projectId = config.get('pubsub.project_id');
  25 +const credentials = JSON.parse(config.get('pubsub.service_account'));
  26 +const requestTopic = config.get('request_topic');
  27 +
  28 +let pubSubClient;
  29 +
  30 +function PubSubProducer() {
  31 + this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  32 + let data = JSON.stringify(
  33 + {
  34 + key: scriptId,
  35 + data: [...rawResponse],
  36 + headers: headers
  37 + });
  38 + let dataBuffer = Buffer.from(data);
  39 + return pubSubClient.topic(responseTopic).publish(dataBuffer);
  40 + }
  41 +}
  42 +
  43 +(async () => {
  44 + try {
  45 + logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
  46 + pubSubClient = new PubSub({projectId: projectId, credentials: credentials});
  47 +
  48 + const subscription = pubSubClient.subscription(requestTopic);
  49 +
  50 + const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer());
  51 +
  52 + const messageHandler = message => {
  53 +
  54 + messageProcessor.onJsInvokeMessage(message.data.toString('utf8'));
  55 + message.ack();
  56 + };
  57 +
  58 + subscription.on('message', messageHandler);
  59 +
  60 + } catch (e) {
  61 + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
  62 + logger.error(e.stack);
  63 + exit(-1);
  64 + }
  65 +})();
  66 +
  67 +process.on('exit', () => {
  68 + exit(0);
  69 +});
  70 +
  71 +async function exit(status) {
  72 + logger.info('Exiting with status: %d ...', status);
  73 + if (pubSubClient) {
  74 + logger.info('Stopping Pub/Sub client.')
  75 + try {
  76 + await pubSubClient.close();
  77 + logger.info('Pub/Sub client is stopped.')
  78 + process.exit(status);
  79 + } catch (e) {
  80 + logger.info('Pub/Sub client stop error.');
  81 + process.exit(status);
  82 + }
  83 + } else {
  84 + process.exit(status);
  85 + }
  86 +}
  87 +
... ...
  1 +/*
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +'use strict';
  18 +
  19 +const config = require('config'),
  20 + JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
  21 + logger = require('../config/logger')._logger('rabbitmqTemplate');
  22 +
  23 +const requestTopic = config.get('request_topic');
  24 +const amqp = require('amqplib/callback_api');
  25 +let connection;
  26 +let channel;
  27 +let stopped = false;
  28 +const responseTopics = [];
  29 +
  30 +function RabbitMqProducer() {
  31 + this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  32 +
  33 + if (!responseTopics.includes(responseTopic)) {
  34 + await createQueue(responseTopic);
  35 + responseTopics.push(responseTopic);
  36 + }
  37 +
  38 + let data = JSON.stringify(
  39 + {
  40 + key: scriptId,
  41 + data: [...rawResponse],
  42 + headers: headers
  43 + });
  44 + let dataBuffer = Buffer.from(data);
  45 + channel.sendToQueue(responseTopic, dataBuffer);
  46 + return new Promise((resolve, reject) => {
  47 + channel.waitForConfirms((err) => {
  48 + if (err) {
  49 + reject(err);
  50 + } else {
  51 + resolve();
  52 + }
  53 + });
  54 + });
  55 + }
  56 +}
  57 +
  58 +(async () => {
  59 + try {
  60 + logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
  61 +
  62 + amqp.credentials.amqplain('admin', 'password');
  63 + connection = await new Promise((resolve, reject) => {
  64 + amqp.connect('amqp://localhost:5672/', function (err, connection) {
  65 + if (err) {
  66 + reject(err);
  67 + } else {
  68 + resolve(connection);
  69 + }
  70 + });
  71 + });
  72 +
  73 + channel = await new Promise((resolve, reject) => {
  74 + connection.createConfirmChannel(function (err, channel) {
  75 + if (err) {
  76 + reject(err);
  77 + } else {
  78 + resolve(channel);
  79 + }
  80 + });
  81 + });
  82 +
  83 + await createQueue(requestTopic);
  84 +
  85 + const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
  86 +
  87 + while (!stopped) {
  88 + let message = await new Promise((resolve, reject) => {
  89 + channel.get(requestTopic, {}, function (err, msg) {
  90 + if (err) {
  91 + reject(err);
  92 + } else {
  93 + resolve(msg);
  94 + }
  95 + });
  96 + });
  97 +
  98 + if (message) {
  99 + messageProcessor.onJsInvokeMessage(message.content.toString('utf8'));
  100 + channel.ack(message);
  101 + }
  102 + }
  103 + } catch (e) {
  104 + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
  105 + logger.error(e.stack);
  106 + exit(-1);
  107 + }
  108 +})();
  109 +
  110 +function createQueue(topic) {
  111 + let params = {durable: false};
  112 + return new Promise((resolve, reject) => {
  113 + channel.assertQueue(topic, params, function (err, data) {
  114 + if (err) {
  115 + reject(err);
  116 + } else {
  117 + resolve();
  118 + }
  119 + });
  120 + });
  121 +}
  122 +
  123 +process.on('exit', () => {
  124 + exit(0);
  125 +});
  126 +
  127 +async function exit(status) {
  128 + logger.info('Exiting with status: %d ...', status);
  129 +
  130 + if (channel) {
  131 + logger.info('Stopping RabbitMq chanel.')
  132 + await channel.close();
  133 + logger.info('RabbitMq chanel is stopped');
  134 + }
  135 +
  136 + if (connection) {
  137 + logger.info('Stopping RabbitMq connection.')
  138 + try {
  139 + await connection.close();
  140 + logger.info('RabbitMq client is connection.')
  141 + process.exit(status);
  142 + } catch (e) {
  143 + logger.info('RabbitMq connection stop error.');
  144 + process.exit(status);
  145 + }
  146 + } else {
  147 + process.exit(status);
  148 + }
  149 +}
\ No newline at end of file
... ...
... ... @@ -14,16 +14,32 @@
14 14 * limitations under the License.
15 15 */
16 16
17   -const config = require('config');
  17 +const config = require('config'), logger = require('./config/logger')._logger('main');
18 18
19 19 const serviceType = config.get('service-type');
20 20 switch (serviceType) {
21 21 case 'kafka':
22   - require('./queue/kafka/kafkaTemplate');
23   - console.log('Used kafka template.');
  22 + logger.info('Starting kafka template.');
  23 + require('./queue/kafkaTemplate');
  24 + logger.info('kafka template is started.');
  25 + break;
  26 + case 'pubsub':
  27 + logger.info('Starting Pub/Sub template.')
  28 + require('./queue/pubSubTemplate');
  29 + logger.info('Pub/Sub template is started.')
  30 + break;
  31 + case 'aws-sqs':
  32 + logger.info('Starting Aws Sqs template.')
  33 + require('./queue/awsSqsTemplate');
  34 + logger.info('Aws Sqs template is started.')
  35 + break;
  36 + case 'rabbitmq':
  37 + logger.info('Starting RabbitMq template.')
  38 + require('./queue/rabbitmqTemplate');
  39 + logger.info('RabbitMq template is started.')
24 40 break;
25 41 default:
26   - console.error('Unknown service type: ', serviceType);
  42 + logger.error('Unknown service type: ', serviceType);
27 43 process.exit(-1);
28 44 }
29 45
... ...