Commit 64b50fa8bc859cc155d0f2465e4bac4fd5e71f64
1 parent
05e2981b
Remote Js Executor basic message processing.
Showing
14 changed files
with
216 additions
and
34 deletions
... | ... | @@ -78,6 +78,22 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
78 | 78 | requestBuilder.settings(kafkaSettings); |
79 | 79 | requestBuilder.defaultTopic(requestTopic); |
80 | 80 | requestBuilder.encoder(new RemoteJsRequestEncoder()); |
81 | + requestBuilder.enricher((request, responseTopic, requestId) -> { | |
82 | + JsInvokeProtos.RemoteJsRequest.Builder remoteRequest = JsInvokeProtos.RemoteJsRequest.newBuilder(); | |
83 | + if (request.hasCompileRequest()) { | |
84 | + remoteRequest.setCompileRequest(request.getCompileRequest()); | |
85 | + } | |
86 | + if (request.hasInvokeRequest()) { | |
87 | + remoteRequest.setInvokeRequest(request.getInvokeRequest()); | |
88 | + } | |
89 | + if (request.hasReleaseRequest()) { | |
90 | + remoteRequest.setReleaseRequest(request.getReleaseRequest()); | |
91 | + } | |
92 | + remoteRequest.setResponseTopic(responseTopic); | |
93 | + remoteRequest.setRequestIdMSB(requestId.getMostSignificantBits()); | |
94 | + remoteRequest.setRequestIdLSB(requestId.getLeastSignificantBits()); | |
95 | + return remoteRequest.build(); | |
96 | + }); | |
81 | 97 | |
82 | 98 | TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<JsInvokeProtos.RemoteJsResponse> responseBuilder = TBKafkaConsumerTemplate.builder(); |
83 | 99 | responseBuilder.settings(kafkaSettings); |
... | ... | @@ -87,6 +103,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
87 | 103 | responseBuilder.autoCommit(true); |
88 | 104 | responseBuilder.autoCommitIntervalMs(autoCommitInterval); |
89 | 105 | responseBuilder.decoder(new RemoteJsResponseDecoder()); |
106 | + responseBuilder.requestIdExtractor((response) -> { | |
107 | + return new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()); | |
108 | + }); | |
90 | 109 | |
91 | 110 | TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder |
92 | 111 | <JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> builder = TbKafkaRequestTemplate.builder(); |
... | ... | @@ -128,7 +147,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
128 | 147 | return compiledScriptId; |
129 | 148 | } else { |
130 | 149 | log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); |
131 | - throw new RuntimeException(compilationResult.getErrorCode().name()); | |
150 | + throw new RuntimeException(compilationResult.getErrorDetails()); | |
132 | 151 | } |
133 | 152 | }); |
134 | 153 | } | ... | ... |
... | ... | @@ -48,7 +48,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S |
48 | 48 | try { |
49 | 49 | this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get(); |
50 | 50 | } catch (Exception e) { |
51 | - throw new IllegalArgumentException("Can't compile script: " + e.getMessage(), e); | |
51 | + Throwable t = e; | |
52 | + if (e instanceof ExecutionException) { | |
53 | + t = e.getCause(); | |
54 | + } | |
55 | + throw new IllegalArgumentException("Can't compile script: " + t.getMessage(), t); | |
52 | 56 | } |
53 | 57 | } |
54 | 58 | ... | ... |
... | ... | @@ -26,15 +26,20 @@ enum JsInvokeErrorCode { |
26 | 26 | } |
27 | 27 | |
28 | 28 | message RemoteJsRequest { |
29 | - JsCompileRequest compileRequest = 1; | |
30 | - JsInvokeRequest invokeRequest = 2; | |
31 | - JsReleaseRequest releaseRequest = 3; | |
29 | + string responseTopic = 1; | |
30 | + int64 requestIdMSB = 2; | |
31 | + int64 requestIdLSB = 3; | |
32 | + JsCompileRequest compileRequest = 4; | |
33 | + JsInvokeRequest invokeRequest = 5; | |
34 | + JsReleaseRequest releaseRequest = 6; | |
32 | 35 | } |
33 | 36 | |
34 | 37 | message RemoteJsResponse { |
35 | - JsCompileResponse compileResponse = 1; | |
36 | - JsInvokeResponse invokeResponse = 2; | |
37 | - JsReleaseResponse releaseResponse = 3; | |
38 | + int64 requestIdMSB = 1; | |
39 | + int64 requestIdLSB = 2; | |
40 | + JsCompileResponse compileResponse = 3; | |
41 | + JsInvokeResponse invokeResponse = 4; | |
42 | + JsReleaseResponse releaseResponse = 5; | |
38 | 43 | } |
39 | 44 | |
40 | 45 | message JsCompileRequest { | ... | ... |
... | ... | @@ -26,6 +26,7 @@ import java.io.IOException; |
26 | 26 | import java.time.Duration; |
27 | 27 | import java.util.Collections; |
28 | 28 | import java.util.Properties; |
29 | +import java.util.UUID; | |
29 | 30 | |
30 | 31 | /** |
31 | 32 | * Created by ashvayka on 24.09.18. |
... | ... | @@ -34,11 +35,15 @@ public class TBKafkaConsumerTemplate<T> { |
34 | 35 | |
35 | 36 | private final KafkaConsumer<String, byte[]> consumer; |
36 | 37 | private final TbKafkaDecoder<T> decoder; |
38 | + | |
39 | + @Builder.Default | |
40 | + private TbKafkaRequestIdExtractor<T> requestIdExtractor = ((response) -> null); | |
41 | + | |
37 | 42 | @Getter |
38 | 43 | private final String topic; |
39 | 44 | |
40 | 45 | @Builder |
41 | - private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, | |
46 | + private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, TbKafkaRequestIdExtractor<T> requestIdExtractor, | |
42 | 47 | String clientId, String groupId, String topic, |
43 | 48 | boolean autoCommit, int autoCommitIntervalMs) { |
44 | 49 | Properties props = settings.toProps(); |
... | ... | @@ -50,6 +55,7 @@ public class TBKafkaConsumerTemplate<T> { |
50 | 55 | props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); |
51 | 56 | this.consumer = new KafkaConsumer<>(props); |
52 | 57 | this.decoder = decoder; |
58 | + this.requestIdExtractor = requestIdExtractor; | |
53 | 59 | this.topic = topic; |
54 | 60 | } |
55 | 61 | |
... | ... | @@ -68,4 +74,8 @@ public class TBKafkaConsumerTemplate<T> { |
68 | 74 | public T decode(ConsumerRecord<String, byte[]> record) throws IOException { |
69 | 75 | return decoder.decode(record.value()); |
70 | 76 | } |
77 | + | |
78 | + public UUID extractRequestId(T value) { | |
79 | + return requestIdExtractor.extractRequestId(value); | |
80 | + } | |
71 | 81 | } | ... | ... |
... | ... | @@ -24,9 +24,13 @@ import org.apache.kafka.clients.producer.RecordMetadata; |
24 | 24 | import org.apache.kafka.common.PartitionInfo; |
25 | 25 | import org.apache.kafka.common.header.Header; |
26 | 26 | |
27 | +import java.nio.ByteBuffer; | |
28 | +import java.util.Arrays; | |
27 | 29 | import java.util.List; |
28 | 30 | import java.util.Properties; |
31 | +import java.util.UUID; | |
29 | 32 | import java.util.concurrent.Future; |
33 | +import java.util.function.BiConsumer; | |
30 | 34 | |
31 | 35 | /** |
32 | 36 | * Created by ashvayka on 24.09.18. |
... | ... | @@ -35,13 +39,18 @@ public class TBKafkaProducerTemplate<T> { |
35 | 39 | |
36 | 40 | private final KafkaProducer<String, byte[]> producer; |
37 | 41 | private final TbKafkaEncoder<T> encoder; |
42 | + | |
43 | + @Builder.Default | |
44 | + private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value); | |
45 | + | |
38 | 46 | private final TbKafkaPartitioner<T> partitioner; |
39 | 47 | private final List<PartitionInfo> partitionInfoList; |
40 | 48 | @Getter |
41 | 49 | private final String defaultTopic; |
42 | 50 | |
43 | 51 | @Builder |
44 | - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaPartitioner<T> partitioner, String defaultTopic) { | |
52 | + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder<T> encoder, TbKafkaEnricher<T> enricher, | |
53 | + TbKafkaPartitioner<T> partitioner, String defaultTopic) { | |
45 | 54 | Properties props = settings.toProps(); |
46 | 55 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); |
47 | 56 | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); |
... | ... | @@ -49,10 +58,15 @@ public class TBKafkaProducerTemplate<T> { |
49 | 58 | //Maybe this should not be cached, but we don't plan to change size of partitions |
50 | 59 | this.partitionInfoList = producer.partitionsFor(defaultTopic); |
51 | 60 | this.encoder = encoder; |
61 | + this.enricher = enricher; | |
52 | 62 | this.partitioner = partitioner; |
53 | 63 | this.defaultTopic = defaultTopic; |
54 | 64 | } |
55 | 65 | |
66 | + public T enrich(T value, String responseTopic, UUID requestId) { | |
67 | + return enricher.enrich(value, responseTopic, requestId); | |
68 | + } | |
69 | + | |
56 | 70 | public Future<RecordMetadata> send(String key, T value) { |
57 | 71 | return send(key, value, null, null); |
58 | 72 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.kafka; | |
17 | + | |
18 | +import java.util.UUID; | |
19 | + | |
20 | +public interface TbKafkaEnricher<T> { | |
21 | + | |
22 | + T enrich(T value, String responseTopic, UUID requestId); | |
23 | + | |
24 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.kafka; | |
17 | + | |
18 | +import java.util.UUID; | |
19 | + | |
20 | +public interface TbKafkaRequestIdExtractor<T> { | |
21 | + | |
22 | + UUID extractRequestId(T value); | |
23 | + | |
24 | +} | ... | ... |
... | ... | @@ -94,18 +94,34 @@ public class TbKafkaRequestTemplate<Request, Response> { |
94 | 94 | ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); |
95 | 95 | responses.forEach(response -> { |
96 | 96 | Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); |
97 | + Response decocedResponse = null; | |
98 | + UUID requestId = null; | |
97 | 99 | if (requestIdHeader == null) { |
98 | - log.error("[{}] Missing requestIdHeader", response); | |
99 | - } | |
100 | - UUID requestId = bytesToUuid(requestIdHeader.value()); | |
101 | - ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId); | |
102 | - if (expectedResponse == null) { | |
103 | - log.trace("[{}] Invalid or stale request", requestId); | |
104 | - } else { | |
105 | 100 | try { |
106 | - expectedResponse.future.set(responseTemplate.decode(response)); | |
101 | + decocedResponse = responseTemplate.decode(response); | |
102 | + requestId = responseTemplate.extractRequestId(decocedResponse); | |
103 | + | |
107 | 104 | } catch (IOException e) { |
108 | - expectedResponse.future.setException(e); | |
105 | + log.error("Failed to decode response", e); | |
106 | + } | |
107 | + } else { | |
108 | + requestId = bytesToUuid(requestIdHeader.value()); | |
109 | + } | |
110 | + if (requestId == null) { | |
111 | + log.error("[{}] Missing requestId in header and response", response); | |
112 | + } else { | |
113 | + ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId); | |
114 | + if (expectedResponse == null) { | |
115 | + log.trace("[{}] Invalid or stale request", requestId); | |
116 | + } else { | |
117 | + try { | |
118 | + if (decocedResponse == null) { | |
119 | + decocedResponse = responseTemplate.decode(response); | |
120 | + } | |
121 | + expectedResponse.future.set(decocedResponse); | |
122 | + } catch (IOException e) { | |
123 | + expectedResponse.future.setException(e); | |
124 | + } | |
109 | 125 | } |
110 | 126 | } |
111 | 127 | }); |
... | ... | @@ -144,6 +160,7 @@ public class TbKafkaRequestTemplate<Request, Response> { |
144 | 160 | headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()))); |
145 | 161 | SettableFuture<Response> future = SettableFuture.create(); |
146 | 162 | pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); |
163 | + request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); | |
147 | 164 | requestTemplate.send(key, request, headers); |
148 | 165 | return future; |
149 | 166 | } | ... | ... |
msa/js-executor/api/Utils.js
0 → 100644
1 | +/* | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | + | |
17 | +'use strict'; | |
18 | + | |
19 | +const Long = require('long'); | |
20 | +const uuidParse = require('uuid-parse'); | |
21 | + | |
22 | +var logger = require('../config/logger')('Utils'); | |
23 | + | |
24 | +exports.toUUIDString = function(mostSigBits, leastSigBits) { | |
25 | + var msbBytes = Long.fromValue(mostSigBits, false).toBytes(false); | |
26 | + var lsbBytes = Long.fromValue(leastSigBits, false).toBytes(false); | |
27 | + var uuidBytes = msbBytes.concat(lsbBytes); | |
28 | + var buff = new Buffer(uuidBytes, 'utf8'); | |
29 | + return uuidParse.unparse(uuidBytes); | |
30 | +} | |
31 | + | |
32 | +exports.UUIDToBits = function(uuidString) { | |
33 | + const bytes = uuidParse.parse(uuidString); | |
34 | + var msb = Long.fromBytes(bytes.slice(0,8), false, false).toString(); | |
35 | + var lsb = Long.fromBytes(bytes.slice(-8), false, false).toString(); | |
36 | + return [msb, lsb]; | |
37 | +} | ... | ... |
... | ... | @@ -16,9 +16,10 @@ |
16 | 16 | |
17 | 17 | 'use strict'; |
18 | 18 | |
19 | -var logger = require('../config/logger')('JsMessageConsumer'); | |
20 | - | |
21 | -var js = require('./jsinvoke').js; | |
19 | +const logger = require('../config/logger')('JsMessageConsumer'); | |
20 | +const Utils = require('./Utils'); | |
21 | +const js = require('./jsinvoke.proto').js; | |
22 | +const KeyedMessage = require('kafka-node').KeyedMessage; | |
22 | 23 | |
23 | 24 | |
24 | 25 | exports.onJsInvokeMessage = function(message, producer) { |
... | ... | @@ -29,7 +30,15 @@ exports.onJsInvokeMessage = function(message, producer) { |
29 | 30 | |
30 | 31 | logger.info('Received request: %s', JSON.stringify(request)); |
31 | 32 | |
33 | + var requestId = getRequestId(request); | |
34 | + | |
35 | + logger.info('Received request, responseTopic: [%s]; requestId: [%s]', request.responseTopic, requestId); | |
36 | + | |
32 | 37 | if (request.compileRequest) { |
38 | + var scriptId = getScriptId(request.compileRequest); | |
39 | + | |
40 | + logger.info('Received compile request, scriptId: [%s]', scriptId); | |
41 | + | |
33 | 42 | var compileResponse = js.JsCompileResponse.create( |
34 | 43 | { |
35 | 44 | errorCode: js.JsInvokeErrorCode.COMPILATION_ERROR, |
... | ... | @@ -39,16 +48,31 @@ exports.onJsInvokeMessage = function(message, producer) { |
39 | 48 | scriptIdMSB: request.compileRequest.scriptIdMSB |
40 | 49 | } |
41 | 50 | ); |
51 | + const requestIdBits = Utils.UUIDToBits(requestId); | |
42 | 52 | var response = js.RemoteJsResponse.create( |
43 | 53 | { |
54 | + requestIdMSB: requestIdBits[0], | |
55 | + requestIdLSB: requestIdBits[1], | |
44 | 56 | compileResponse: compileResponse |
45 | 57 | } |
46 | 58 | ); |
47 | 59 | var rawResponse = js.RemoteJsResponse.encode(response).finish(); |
48 | - sendMessage(producer, rawResponse); | |
60 | + sendMessage(producer, rawResponse, request.responseTopic, scriptId); | |
49 | 61 | } |
50 | 62 | } |
51 | 63 | |
52 | -function sendMessage(producer, rawMessage) { | |
64 | +function sendMessage(producer, rawMessage, responseTopic, scriptId) { | |
65 | + const message = new KeyedMessage(scriptId, rawMessage); | |
66 | + const payloads = [ { topic: responseTopic, messages: rawMessage, key: scriptId } ]; | |
67 | + producer.send(payloads, function (err, data) { | |
68 | + console.log(data); | |
69 | + }); | |
70 | +} | |
71 | + | |
72 | +function getScriptId(request) { | |
73 | + return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); | |
74 | +} | |
53 | 75 | |
76 | +function getRequestId(request) { | |
77 | + return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB); | |
54 | 78 | } | ... | ... |
... | ... | @@ -6,7 +6,7 @@ |
6 | 6 | "main": "server.js", |
7 | 7 | "bin": "server.js", |
8 | 8 | "scripts": { |
9 | - "build-proto": "pbjs -t static-module -w commonjs -o ./api/jsinvoke.js ../../application/src/main/proto/jsinvoke.proto", | |
9 | + "build-proto": "pbjs -t static-module -w commonjs -o ./api/jsinvoke.proto.js ../../application/src/main/proto/jsinvoke.proto", | |
10 | 10 | "install": "npm run build-proto && pkg -t node8-linux-x64,node8-win-x64 --out-path ./target . && node install.js", |
11 | 11 | "test": "echo \"Error: no test specified\" && exit 1", |
12 | 12 | "start": "npm run build-proto && nodemon server.js", |
... | ... | @@ -16,7 +16,9 @@ |
16 | 16 | "config": "^1.30.0", |
17 | 17 | "js-yaml": "^3.12.0", |
18 | 18 | "kafka-node": "^3.0.1", |
19 | + "long": "^4.0.0", | |
19 | 20 | "protobufjs": "^6.8.8", |
21 | + "uuid-parse": "^1.0.0", | |
20 | 22 | "winston": "^3.0.0", |
21 | 23 | "winston-daily-rotate-file": "^3.2.1" |
22 | 24 | }, | ... | ... |
... | ... | @@ -13,16 +13,17 @@ |
13 | 13 | * See the License for the specific language governing permissions and |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | -var config = require('config'), | |
17 | - kafka = require('kafka-node'), | |
18 | - Consumer = kafka.Consumer, | |
19 | - Producer = kafka.Producer, | |
20 | - JsMessageConsumer = require('./api/jsMessageConsumer'); | |
21 | 16 | |
22 | -var logger = require('./config/logger')('main'); | |
17 | +const config = require('config'), | |
18 | + kafka = require('kafka-node'), | |
19 | + Consumer = kafka.Consumer, | |
20 | + Producer = kafka.Producer, | |
21 | + JsMessageConsumer = require('./api/jsMessageConsumer'); | |
23 | 22 | |
24 | -var kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); | |
25 | -var kafkaRequestTopic = config.get('kafka.request_topic'); | |
23 | +const logger = require('./config/logger')('main'); | |
24 | + | |
25 | +const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); | |
26 | +const kafkaRequestTopic = config.get('kafka.request_topic'); | |
26 | 27 | |
27 | 28 | logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
28 | 29 | logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); | ... | ... |
... | ... | @@ -283,6 +283,7 @@ |
283 | 283 | <exclude>src/main/scripts/control/**</exclude> |
284 | 284 | <exclude>src/main/scripts/windows/**</exclude> |
285 | 285 | <exclude>src/main/resources/public/static/rulenode/**</exclude> |
286 | + <exclude>**/*.proto.js</exclude> | |
286 | 287 | </excludes> |
287 | 288 | <mapping> |
288 | 289 | <proto>JAVADOC_STYLE</proto> | ... | ... |