Commit 52f04308d4e81c20ee95f03244177d1f4f5756e3

Authored by Igor Kulikov
1 parent 64b50fa8

Improve JS Executor. Add docker build.

@@ -166,7 +166,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -166,7 +166,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
166 .setScriptBody(scriptIdToBodysMap.get(scriptId)); 166 .setScriptBody(scriptIdToBodysMap.get(scriptId));
167 167
168 for (int i = 0; i < args.length; i++) { 168 for (int i = 0; i < args.length; i++) {
169 - jsRequestBuilder.setArgs(i, args[i].toString()); 169 + jsRequestBuilder.addArgs(args[i].toString());
170 } 170 }
171 171
172 JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() 172 JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
@@ -180,7 +180,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -180,7 +180,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
180 return invokeResult.getResult(); 180 return invokeResult.getResult();
181 } else { 181 } else {
182 log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); 182 log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
183 - throw new RuntimeException(invokeResult.getErrorCode().name()); 183 + throw new RuntimeException(invokeResult.getErrorDetails());
184 } 184 }
185 }); 185 });
186 } 186 }
@@ -174,6 +174,8 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S @@ -174,6 +174,8 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
174 } catch (ExecutionException e) { 174 } catch (ExecutionException e) {
175 if (e.getCause() instanceof ScriptException) { 175 if (e.getCause() instanceof ScriptException) {
176 throw (ScriptException)e.getCause(); 176 throw (ScriptException)e.getCause();
  177 + } else if (e.getCause() instanceof RuntimeException) {
  178 + throw new ScriptException(e.getCause().getMessage());
177 } else { 179 } else {
178 throw new ScriptException(e); 180 throw new ScriptException(e);
179 } 181 }
@@ -28,7 +28,7 @@ public class RuleNodeScriptFactory { @@ -28,7 +28,7 @@ public class RuleNodeScriptFactory {
28 " var metadata = JSON.parse(metadataStr); " + 28 " var metadata = JSON.parse(metadataStr); " +
29 " return JSON.stringify(%s(msg, metadata, msgType));" + 29 " return JSON.stringify(%s(msg, metadata, msgType));" +
30 " function %s(%s, %s, %s) {"; 30 " function %s(%s, %s, %s) {";
31 - private static final String JS_WRAPPER_SUFFIX = "}" + 31 + private static final String JS_WRAPPER_SUFFIX = "\n}" +
32 "\n}"; 32 "\n}";
33 33
34 34
@@ -407,7 +407,7 @@ state: @@ -407,7 +407,7 @@ state:
407 407
408 kafka: 408 kafka:
409 enabled: true 409 enabled: true
410 - bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" 410 + bootstrap.servers: "${TB_KAFKA_SERVERS:192.168.2.157:9092}"
411 acks: "${TB_KAFKA_ACKS:all}" 411 acks: "${TB_KAFKA_ACKS:all}"
412 retries: "${TB_KAFKA_RETRIES:1}" 412 retries: "${TB_KAFKA_RETRIES:1}"
413 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" 413 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
@@ -17,6 +17,9 @@ package org.thingsboard.server.kafka; @@ -17,6 +17,9 @@ package org.thingsboard.server.kafka;
17 17
18 import lombok.Builder; 18 import lombok.Builder;
19 import lombok.Getter; 19 import lombok.Getter;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.apache.kafka.clients.admin.CreateTopicsResult;
  22 +import org.apache.kafka.clients.admin.NewTopic;
20 import org.apache.kafka.clients.producer.KafkaProducer; 23 import org.apache.kafka.clients.producer.KafkaProducer;
21 import org.apache.kafka.clients.producer.ProducerConfig; 24 import org.apache.kafka.clients.producer.ProducerConfig;
22 import org.apache.kafka.clients.producer.ProducerRecord; 25 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,6 +38,7 @@ import java.util.function.BiConsumer; @@ -35,6 +38,7 @@ import java.util.function.BiConsumer;
35 /** 38 /**
36 * Created by ashvayka on 24.09.18. 39 * Created by ashvayka on 24.09.18.
37 */ 40 */
  41 +@Slf4j
38 public class TBKafkaProducerTemplate<T> { 42 public class TBKafkaProducerTemplate<T> {
39 43
40 private final KafkaProducer<String, byte[]> producer; 44 private final KafkaProducer<String, byte[]> producer;
@@ -44,7 +48,7 @@ public class TBKafkaProducerTemplate<T> { @@ -44,7 +48,7 @@ public class TBKafkaProducerTemplate<T> {
44 private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value); 48 private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value);
45 49
46 private final TbKafkaPartitioner<T> partitioner; 50 private final TbKafkaPartitioner<T> partitioner;
47 - private final List<PartitionInfo> partitionInfoList; 51 + private List<PartitionInfo> partitionInfoList;
48 @Getter 52 @Getter
49 private final String defaultTopic; 53 private final String defaultTopic;
50 54
@@ -55,14 +59,24 @@ public class TBKafkaProducerTemplate<T> { @@ -55,14 +59,24 @@ public class TBKafkaProducerTemplate<T> {
55 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 59 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
56 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 60 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
57 this.producer = new KafkaProducer<>(props); 61 this.producer = new KafkaProducer<>(props);
58 - //Maybe this should not be cached, but we don't plan to change size of partitions  
59 - this.partitionInfoList = producer.partitionsFor(defaultTopic);  
60 this.encoder = encoder; 62 this.encoder = encoder;
61 this.enricher = enricher; 63 this.enricher = enricher;
62 this.partitioner = partitioner; 64 this.partitioner = partitioner;
63 this.defaultTopic = defaultTopic; 65 this.defaultTopic = defaultTopic;
64 } 66 }
65 67
  68 + public void init() {
  69 + try {
  70 + TBKafkaAdmin admin = new TBKafkaAdmin();
  71 + CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
  72 + result.all().get();
  73 + } catch (Exception e) {
  74 + log.trace("Failed to create topic: {}", e.getMessage(), e);
  75 + }
  76 + //Maybe this should not be cached, but we don't plan to change size of partitions
  77 + this.partitionInfoList = producer.partitionsFor(defaultTopic);
  78 + }
  79 +
66 public T enrich(T value, String responseTopic, UUID requestId) { 80 public T enrich(T value, String responseTopic, UUID requestId) {
67 return enricher.enrich(value, responseTopic, requestId); 81 return enricher.enrich(value, responseTopic, requestId);
68 } 82 }
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
23 import org.apache.kafka.clients.admin.CreateTopicsResult; 23 import org.apache.kafka.clients.admin.CreateTopicsResult;
24 import org.apache.kafka.clients.admin.NewTopic; 24 import org.apache.kafka.clients.admin.NewTopic;
25 import org.apache.kafka.clients.consumer.ConsumerRecords; 25 import org.apache.kafka.clients.consumer.ConsumerRecords;
  26 +import org.apache.kafka.clients.producer.RecordMetadata;
26 import org.apache.kafka.common.header.Header; 27 import org.apache.kafka.common.header.Header;
27 import org.apache.kafka.common.header.internals.RecordHeader; 28 import org.apache.kafka.common.header.internals.RecordHeader;
28 29
@@ -33,11 +34,7 @@ import java.time.Duration; @@ -33,11 +34,7 @@ import java.time.Duration;
33 import java.util.ArrayList; 34 import java.util.ArrayList;
34 import java.util.List; 35 import java.util.List;
35 import java.util.UUID; 36 import java.util.UUID;
36 -import java.util.concurrent.ConcurrentHashMap;  
37 -import java.util.concurrent.ConcurrentMap;  
38 -import java.util.concurrent.ExecutorService;  
39 -import java.util.concurrent.Executors;  
40 -import java.util.concurrent.TimeoutException; 37 +import java.util.concurrent.*;
41 38
42 /** 39 /**
43 * Created by ashvayka on 25.09.18. 40 * Created by ashvayka on 25.09.18.
@@ -86,6 +83,7 @@ public class TbKafkaRequestTemplate<Request, Response> { @@ -86,6 +83,7 @@ public class TbKafkaRequestTemplate<Request, Response> {
86 } catch (Exception e) { 83 } catch (Exception e) {
87 log.trace("Failed to create topic: {}", e.getMessage(), e); 84 log.trace("Failed to create topic: {}", e.getMessage(), e);
88 } 85 }
  86 + this.requestTemplate.init();
89 tickTs = System.currentTimeMillis(); 87 tickTs = System.currentTimeMillis();
90 responseTemplate.subscribe(); 88 responseTemplate.subscribe();
91 executor.submit(() -> { 89 executor.submit(() -> {
  1 +#
  2 +# Copyright © 2016-2018 The Thingsboard Authors
  3 +#
  4 +# Licensed under the Apache License, Version 2.0 (the "License");
  5 +# you may not use this file except in compliance with the License.
  6 +# You may obtain a copy of the License at
  7 +#
  8 +# http://www.apache.org/licenses/LICENSE-2.0
  9 +#
  10 +# Unless required by applicable law or agreed to in writing, software
  11 +# distributed under the License is distributed on an "AS IS" BASIS,
  12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +# See the License for the specific language governing permissions and
  14 +# limitations under the License.
  15 +#
  16 +
  17 +
  18 +version: '2'
  19 +
  20 +services:
  21 + zookeeper:
  22 + image: "wurstmeister/zookeeper"
  23 + ports:
  24 + - "2181"
  25 + kafka:
  26 + image: "wurstmeister/kafka"
  27 + ports:
  28 + - "9092:9092"
  29 + environment:
  30 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  31 + KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
  32 + KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://${KAFKA_HOSTNAME}:9092
  33 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
  34 + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
  35 + KAFKA_CREATE_TOPICS: "${KAFKA_TOPICS}"
  36 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
  37 + depends_on:
  38 + - zookeeper
  39 + tb-js-executor:
  40 + image: "local-maven-build/tb-js-executor:latest"
  41 + environment:
  42 + TB_KAFKA_SERVERS: kafka:9092
  43 + env_file:
  44 + - tb-js-executor.env
  45 + depends_on:
  46 + - kafka
  1 +
  2 +REMOTE_JS_EVAL_REQUEST_TOPIC=js.eval.requests
  3 +TB_KAFKA_SERVERS=localhost:9092
  4 +LOGGER_LEVEL=debug
  5 +LOG_FOLDER=logs
  6 +LOGGER_FILENAME=tb-js-executor-%DATE%.log
  7 +DOCKER_MODE=true
  1 +/*
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +'use strict';
  17 +
  18 +const vm = require('vm');
  19 +
  20 +function JsExecutor() {
  21 +}
  22 +
  23 +JsExecutor.prototype.compileScript = function(code) {
  24 + return new Promise(function(resolve, reject) {
  25 + try {
  26 + code = "("+code+")(...args)";
  27 + var script = new vm.Script(code);
  28 + resolve(script);
  29 + } catch (err) {
  30 + reject(err);
  31 + }
  32 + });
  33 +}
  34 +
  35 +JsExecutor.prototype.executeScript = function(script, args, timeout) {
  36 + return new Promise(function(resolve, reject) {
  37 + try {
  38 + var sandbox = Object.create(null);
  39 + sandbox.args = args;
  40 + var result = script.runInNewContext(sandbox, {timeout: timeout});
  41 + resolve(result);
  42 + } catch (err) {
  43 + reject(err);
  44 + }
  45 + });
  46 +}
  47 +
  48 +module.exports = JsExecutor;
  1 +/*
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +
  17 +'use strict';
  18 +
  19 +const logger = require('../config/logger')('JsInvokeMessageProcessor'),
  20 + Utils = require('./utils'),
  21 + js = require('./jsinvoke.proto').js,
  22 + KeyedMessage = require('kafka-node').KeyedMessage,
  23 + JsExecutor = require('./jsExecutor');
  24 +
  25 +function JsInvokeMessageProcessor(producer) {
  26 + this.producer = producer;
  27 + this.executor = new JsExecutor();
  28 + this.scriptMap = {};
  29 +}
  30 +
  31 +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
  32 +
  33 + var requestId;
  34 + try {
  35 + var request = js.RemoteJsRequest.decode(message.value);
  36 + requestId = getRequestId(request);
  37 +
  38 + logger.debug('[%s] Received request, responseTopic: [%s]', requestId, request.responseTopic);
  39 +
  40 + if (request.compileRequest) {
  41 + this.processCompileRequest(requestId, request.responseTopic, request.compileRequest);
  42 + } else if (request.invokeRequest) {
  43 + this.processInvokeRequest(requestId, request.responseTopic, request.invokeRequest);
  44 + } else if (request.releaseRequest) {
  45 + this.processReleaseRequest(requestId, request.responseTopic, request.releaseRequest);
  46 + } else {
  47 + logger.error('[%s] Unknown request recevied!', requestId);
  48 + }
  49 +
  50 + } catch (err) {
  51 + logger.error('[%s] Failed to process request: %s', requestId, err.message);
  52 + logger.error(err.stack);
  53 + }
  54 +}
  55 +
  56 +JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) {
  57 + var scriptId = getScriptId(compileRequest);
  58 + logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
  59 +
  60 + this.executor.compileScript(compileRequest.scriptBody).then(
  61 + (script) => {
  62 + this.scriptMap[scriptId] = script;
  63 + var compileResponse = createCompileResponse(scriptId, true);
  64 + logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId);
  65 + this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
  66 + },
  67 + (err) => {
  68 + var compileResponse = createCompileResponse(scriptId, false, js.JsInvokeErrorCode.COMPILATION_ERROR, err);
  69 + logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId);
  70 + this.sendResponse(requestId, responseTopic, scriptId, compileResponse);
  71 + }
  72 + );
  73 +}
  74 +
  75 +JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) {
  76 + var scriptId = getScriptId(invokeRequest);
  77 + logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
  78 + this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then(
  79 + (script) => {
  80 + this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then(
  81 + (result) => {
  82 + var invokeResponse = createInvokeResponse(result, true);
  83 + logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId);
  84 + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  85 + },
  86 + (err) => {
  87 + var errorCode;
  88 + if (err.message.includes('Script execution timed out')) {
  89 + errorCode = js.JsInvokeErrorCode.TIMEOUT_ERROR;
  90 + } else {
  91 + errorCode = js.JsInvokeErrorCode.RUNTIME_ERROR;
  92 + }
  93 + var invokeResponse = createInvokeResponse("", false, errorCode, err);
  94 + logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode);
  95 + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  96 + }
  97 + )
  98 + },
  99 + (err) => {
  100 + var invokeResponse = createInvokeResponse("", false, js.JsInvokeErrorCode.COMPILATION_ERROR, err);
  101 + logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, js.JsInvokeErrorCode.COMPILATION_ERROR);
  102 + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse);
  103 + }
  104 + );
  105 +}
  106 +
  107 +JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) {
  108 + var scriptId = getScriptId(releaseRequest);
  109 + logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
  110 + if (this.scriptMap[scriptId]) {
  111 + delete this.scriptMap[scriptId];
  112 + }
  113 + var releaseResponse = createReleaseResponse(scriptId, true);
  114 + logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId);
  115 + this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse);
  116 +}
  117 +
  118 +JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) {
  119 + var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
  120 + var rawResponse = js.RemoteJsResponse.encode(remoteResponse).finish();
  121 + const message = new KeyedMessage(scriptId, rawResponse);
  122 + const payloads = [ { topic: responseTopic, messages: message, key: scriptId } ];
  123 + this.producer.send(payloads, function (err, data) {
  124 + if (err) {
  125 + logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message);
  126 + logger.error(err.stack);
  127 + }
  128 + });
  129 +}
  130 +
  131 +JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) {
  132 + var self = this;
  133 + return new Promise(function(resolve, reject) {
  134 + if (self.scriptMap[scriptId]) {
  135 + resolve(self.scriptMap[scriptId]);
  136 + } else {
  137 + self.executor.compileScript(scriptBody).then(
  138 + (script) => {
  139 + self.scriptMap[scriptId] = script;
  140 + resolve(script);
  141 + },
  142 + (err) => {
  143 + reject(err);
  144 + }
  145 + );
  146 + }
  147 + });
  148 +}
  149 +
  150 +function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) {
  151 + const requestIdBits = Utils.UUIDToBits(requestId);
  152 + return js.RemoteJsResponse.create(
  153 + {
  154 + requestIdMSB: requestIdBits[0],
  155 + requestIdLSB: requestIdBits[1],
  156 + compileResponse: compileResponse,
  157 + invokeResponse: invokeResponse,
  158 + releaseResponse: releaseResponse
  159 + }
  160 + );
  161 +}
  162 +
  163 +function createCompileResponse(scriptId, success, errorCode, err) {
  164 + const scriptIdBits = Utils.UUIDToBits(scriptId);
  165 + return js.JsCompileResponse.create(
  166 + {
  167 + errorCode: errorCode,
  168 + success: success,
  169 + errorDetails: parseJsErrorDetails(err),
  170 + scriptIdMSB: scriptIdBits[0],
  171 + scriptIdLSB: scriptIdBits[1]
  172 + }
  173 + );
  174 +}
  175 +
  176 +function createInvokeResponse(result, success, errorCode, err) {
  177 + return js.JsInvokeResponse.create(
  178 + {
  179 + errorCode: errorCode,
  180 + success: success,
  181 + errorDetails: parseJsErrorDetails(err),
  182 + result: result
  183 + }
  184 + );
  185 +}
  186 +
  187 +function createReleaseResponse(scriptId, success) {
  188 + const scriptIdBits = Utils.UUIDToBits(scriptId);
  189 + return js.JsReleaseResponse.create(
  190 + {
  191 + success: success,
  192 + scriptIdMSB: scriptIdBits[0],
  193 + scriptIdLSB: scriptIdBits[1]
  194 + }
  195 + );
  196 +}
  197 +
  198 +function parseJsErrorDetails(err) {
  199 + if (!err) {
  200 + return '';
  201 + }
  202 + var details = err.name + ': ' + err.message;
  203 + if (err.stack) {
  204 + var lines = err.stack.split('\n');
  205 + if (lines && lines.length) {
  206 + var line = lines[0];
  207 + var splitted = line.split(':');
  208 + if (splitted && splitted.length === 2) {
  209 + if (!isNaN(splitted[1])) {
  210 + details += ' in at line number ' + splitted[1];
  211 + }
  212 + }
  213 + }
  214 + }
  215 + return details;
  216 +}
  217 +
  218 +function getScriptId(request) {
  219 + return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB);
  220 +}
  221 +
  222 +function getRequestId(request) {
  223 + return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB);
  224 +}
  225 +
  226 +module.exports = JsInvokeMessageProcessor;
1 -/*  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -  
17 -'use strict';  
18 -  
19 -const logger = require('../config/logger')('JsMessageConsumer');  
20 -const Utils = require('./Utils');  
21 -const js = require('./jsinvoke.proto').js;  
22 -const KeyedMessage = require('kafka-node').KeyedMessage;  
23 -  
24 -  
25 -exports.onJsInvokeMessage = function(message, producer) {  
26 -  
27 - logger.info('Received message: %s', JSON.stringify(message));  
28 -  
29 - var request = js.RemoteJsRequest.decode(message.value);  
30 -  
31 - logger.info('Received request: %s', JSON.stringify(request));  
32 -  
33 - var requestId = getRequestId(request);  
34 -  
35 - logger.info('Received request, responseTopic: [%s]; requestId: [%s]', request.responseTopic, requestId);  
36 -  
37 - if (request.compileRequest) {  
38 - var scriptId = getScriptId(request.compileRequest);  
39 -  
40 - logger.info('Received compile request, scriptId: [%s]', scriptId);  
41 -  
42 - var compileResponse = js.JsCompileResponse.create(  
43 - {  
44 - errorCode: js.JsInvokeErrorCode.COMPILATION_ERROR,  
45 - success: false,  
46 - errorDetails: 'Not Implemented!',  
47 - scriptIdLSB: request.compileRequest.scriptIdLSB,  
48 - scriptIdMSB: request.compileRequest.scriptIdMSB  
49 - }  
50 - );  
51 - const requestIdBits = Utils.UUIDToBits(requestId);  
52 - var response = js.RemoteJsResponse.create(  
53 - {  
54 - requestIdMSB: requestIdBits[0],  
55 - requestIdLSB: requestIdBits[1],  
56 - compileResponse: compileResponse  
57 - }  
58 - );  
59 - var rawResponse = js.RemoteJsResponse.encode(response).finish();  
60 - sendMessage(producer, rawResponse, request.responseTopic, scriptId);  
61 - }  
62 -}  
63 -  
64 -function sendMessage(producer, rawMessage, responseTopic, scriptId) {  
65 - const message = new KeyedMessage(scriptId, rawMessage);  
66 - const payloads = [ { topic: responseTopic, messages: rawMessage, key: scriptId } ];  
67 - producer.send(payloads, function (err, data) {  
68 - console.log(data);  
69 - });  
70 -}  
71 -  
72 -function getScriptId(request) {  
73 - return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB);  
74 -}  
75 -  
76 -function getRequestId(request) {  
77 - return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB);  
78 -}  
msa/js-executor/api/utils.js renamed from msa/js-executor/api/Utils.js
@@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
16 16
17 'use strict'; 17 'use strict';
18 18
19 -const Long = require('long');  
20 -const uuidParse = require('uuid-parse'); 19 +const Long = require('long'),
  20 + uuidParse = require('uuid-parse');
21 21
22 var logger = require('../config/logger')('Utils'); 22 var logger = require('../config/logger')('Utils');
23 23
@@ -22,7 +22,7 @@ const { combine, timestamp, label, printf, splat } = format; @@ -22,7 +22,7 @@ const { combine, timestamp, label, printf, splat } = format;
22 22
23 var loggerTransports = []; 23 var loggerTransports = [];
24 24
25 -if (process.env.NODE_ENV !== 'production') { 25 +if (process.env.NODE_ENV !== 'production' || process.env.DOCKER_MODE === 'true') {
26 loggerTransports.push(new transports.Console({ 26 loggerTransports.push(new transports.Console({
27 handleExceptions: true 27 handleExceptions: true
28 })); 28 }));
  1 +#
  2 +# Copyright © 2016-2018 The Thingsboard Authors
  3 +#
  4 +# Licensed under the Apache License, Version 2.0 (the "License");
  5 +# you may not use this file except in compliance with the License.
  6 +# You may obtain a copy of the License at
  7 +#
  8 +# http://www.apache.org/licenses/LICENSE-2.0
  9 +#
  10 +# Unless required by applicable law or agreed to in writing, software
  11 +# distributed under the License is distributed on an "AS IS" BASIS,
  12 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +# See the License for the specific language governing permissions and
  14 +# limitations under the License.
  15 +#
  16 +
  17 +FROM debian:stretch
  18 +
  19 +COPY start-js-executor.sh ${pkg.name}.deb /tmp/
  20 +
  21 +RUN chmod a+x /tmp/*.sh \
  22 + && mv /tmp/start-js-executor.sh /usr/bin
  23 +
  24 +RUN dpkg -i /tmp/${pkg.name}.deb
  25 +
  26 +CMD ["start-js-executor.sh"]
  1 +#!/bin/bash
  2 +#
  3 +# Copyright © 2016-2018 The Thingsboard Authors
  4 +#
  5 +# Licensed under the Apache License, Version 2.0 (the "License");
  6 +# you may not use this file except in compliance with the License.
  7 +# You may obtain a copy of the License at
  8 +#
  9 +# http://www.apache.org/licenses/LICENSE-2.0
  10 +#
  11 +# Unless required by applicable law or agreed to in writing, software
  12 +# distributed under the License is distributed on an "AS IS" BASIS,
  13 +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14 +# See the License for the specific language governing permissions and
  15 +# limitations under the License.
  16 +#
  17 +
  18 +
  19 +echo "Starting '${project.name}' ..."
  20 +
  21 +CONF_FOLDER="${pkg.installFolder}/conf"
  22 +
  23 +mainfile=${pkg.installFolder}/bin/${pkg.name}
  24 +configfile=${pkg.name}.conf
  25 +identity=${pkg.name}
  26 +
  27 +source "${CONF_FOLDER}/${configfile}"
  28 +
  29 +su -s /bin/sh -c "$mainfile"
@@ -40,6 +40,7 @@ @@ -40,6 +40,7 @@
40 <pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder> 40 <pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder>
41 <pkg.linux.dist>${project.build.directory}/package/linux</pkg.linux.dist> 41 <pkg.linux.dist>${project.build.directory}/package/linux</pkg.linux.dist>
42 <pkg.win.dist>${project.build.directory}/package/windows</pkg.win.dist> 42 <pkg.win.dist>${project.build.directory}/package/windows</pkg.win.dist>
  43 + <dockerfile.skip>true</dockerfile.skip>
43 </properties> 44 </properties>
44 45
45 <dependencies> 46 <dependencies>
@@ -211,6 +212,22 @@ @@ -211,6 +212,22 @@
211 </filters> 212 </filters>
212 </configuration> 213 </configuration>
213 </execution> 214 </execution>
  215 + <execution>
  216 + <id>copy-docker-config</id>
  217 + <phase>process-resources</phase>
  218 + <goals>
  219 + <goal>copy-resources</goal>
  220 + </goals>
  221 + <configuration>
  222 + <outputDirectory>${project.build.directory}</outputDirectory>
  223 + <resources>
  224 + <resource>
  225 + <directory>docker</directory>
  226 + <filtering>true</filtering>
  227 + </resource>
  228 + </resources>
  229 + </configuration>
  230 + </execution>
214 </executions> 231 </executions>
215 </plugin> 232 </plugin>
216 <plugin> 233 <plugin>
@@ -260,6 +277,27 @@ @@ -260,6 +277,27 @@
260 </execution> 277 </execution>
261 </executions> 278 </executions>
262 </plugin> 279 </plugin>
  280 + <plugin>
  281 + <groupId>com.spotify</groupId>
  282 + <artifactId>dockerfile-maven-plugin</artifactId>
  283 + <version>1.4.4</version>
  284 + <executions>
  285 + <execution>
  286 + <id>build-docker-image</id>
  287 + <phase>pre-integration-test</phase>
  288 + <goals>
  289 + <goal>build</goal>
  290 + </goals>
  291 + </execution>
  292 + </executions>
  293 + <configuration>
  294 + <skip>${dockerfile.skip}</skip>
  295 + <repository>local-maven-build/${pkg.name}</repository>
  296 + <verbose>true</verbose>
  297 + <googleContainerRegistryEnabled>false</googleContainerRegistryEnabled>
  298 + <contextDirectory>${project.build.directory}</contextDirectory>
  299 + </configuration>
  300 + </plugin>
263 </plugins> 301 </plugins>
264 </build> 302 </build>
265 <profiles> 303 <profiles>
@@ -16,17 +16,10 @@ @@ -16,17 +16,10 @@
16 16
17 const config = require('config'), 17 const config = require('config'),
18 kafka = require('kafka-node'), 18 kafka = require('kafka-node'),
19 - Consumer = kafka.Consumer, 19 + ConsumerGroup = kafka.ConsumerGroup,
20 Producer = kafka.Producer, 20 Producer = kafka.Producer,
21 - JsMessageConsumer = require('./api/jsMessageConsumer');  
22 -  
23 -const logger = require('./config/logger')('main');  
24 -  
25 -const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');  
26 -const kafkaRequestTopic = config.get('kafka.request_topic');  
27 -  
28 -logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);  
29 -logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); 21 + JsInvokeMessageProcessor = require('./api/jsInvokeMessageProcessor'),
  22 + logger = require('./config/logger')('main');
30 23
31 var kafkaClient; 24 var kafkaClient;
32 25
@@ -34,35 +27,42 @@ var kafkaClient; @@ -34,35 +27,42 @@ var kafkaClient;
34 try { 27 try {
35 logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); 28 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
36 29
  30 + const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
  31 + const kafkaRequestTopic = config.get('kafka.request_topic');
  32 +
  33 + logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
  34 + logger.info('Kafka Requests Topic: %s', kafkaRequestTopic);
  35 +
37 kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaBootstrapServers}); 36 kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaBootstrapServers});
38 37
39 - var consumer = new Consumer(  
40 - kafkaClient,  
41 - [  
42 - { topic: kafkaRequestTopic, partition: 0 }  
43 - ], 38 + var consumer = new ConsumerGroup(
44 { 39 {
  40 + kafkaHost: kafkaBootstrapServers,
  41 + groupId: 'js-executor-group',
45 autoCommit: true, 42 autoCommit: true,
46 encoding: 'buffer' 43 encoding: 'buffer'
47 - } 44 + },
  45 + kafkaRequestTopic
48 ); 46 );
49 47
50 var producer = new Producer(kafkaClient); 48 var producer = new Producer(kafkaClient);
51 producer.on('error', (err) => { 49 producer.on('error', (err) => {
52 - logger.error('Unexpected kafka producer error');  
53 - logger.error(err); 50 + logger.error('Unexpected kafka producer error: %s', err.message);
  51 + logger.error(err.stack);
54 }); 52 });
55 53
  54 + var messageProcessor = new JsInvokeMessageProcessor(producer);
  55 +
56 producer.on('ready', () => { 56 producer.on('ready', () => {
57 consumer.on('message', (message) => { 57 consumer.on('message', (message) => {
58 - JsMessageConsumer.onJsInvokeMessage(message, producer); 58 + messageProcessor.onJsInvokeMessage(message);
59 }); 59 });
  60 + logger.info('Started ThingsBoard JavaScript Executor Microservice.');
60 }); 61 });
61 62
62 - logger.info('Started ThingsBoard JavaScript Executor Microservice.');  
63 } catch (e) { 63 } catch (e) {
64 logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); 64 logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
65 - logger.error(e); 65 + logger.error(e.stack);
66 exit(-1); 66 exit(-1);
67 } 67 }
68 })(); 68 })();
@@ -75,11 +75,13 @@ function exit(status) { @@ -75,11 +75,13 @@ function exit(status) {
75 logger.info('Exiting with status: %d ...', status); 75 logger.info('Exiting with status: %d ...', status);
76 if (kafkaClient) { 76 if (kafkaClient) {
77 logger.info('Stopping Kafka Client...'); 77 logger.info('Stopping Kafka Client...');
78 - kafkaClient.close(() => { 78 + var _kafkaClient = kafkaClient;
  79 + kafkaClient = null;
  80 + _kafkaClient.close(() => {
79 logger.info('Kafka Client stopped.'); 81 logger.info('Kafka Client stopped.');
80 process.exit(status); 82 process.exit(status);
81 }); 83 });
82 } else { 84 } else {
83 process.exit(status); 85 process.exit(status);
84 } 86 }
85 -}  
  87 +}
@@ -2,5 +2,5 @@ @@ -2,5 +2,5 @@
2 2
3 chown -R ${pkg.user}: ${pkg.logFolder} 3 chown -R ${pkg.user}: ${pkg.logFolder}
4 chown -R ${pkg.user}: ${pkg.installFolder} 4 chown -R ${pkg.user}: ${pkg.installFolder}
5 -update-rc.d ${pkg.name} defaults 5 +# update-rc.d ${pkg.name} defaults
6 6