Commit 316043afaadd847a089da416724bdb727ce68ce2

Authored by Igor Kulikov
2 parents b8c13535 4c4923e3

Merge branch 'develop/2.2' of github.com:thingsboard/thingsboard into develop/2.2

... ... @@ -47,9 +47,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
47 47 @Autowired
48 48 private TbKafkaSettings kafkaSettings;
49 49
50   - @Value("${js.remote.use_js_sandbox}")
51   - private boolean useJsSandbox;
52   -
53 50 @Value("${js.remote.request_topic}")
54 51 private String requestTopic;
55 52
... ... @@ -99,8 +96,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
99 96 }
100 97
101 98 @PreDestroy
102   - public void destroy(){
103   - if(kafkaTemplate != null){
  99 + public void destroy() {
  100 + if (kafkaTemplate != null) {
104 101 kafkaTemplate.stop();
105 102 }
106 103 }
... ... @@ -138,14 +135,19 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
138 135 if (scriptBody == null) {
139 136 return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
140 137 }
141   - JsInvokeProtos.JsInvokeRequest jsRequest = JsInvokeProtos.JsInvokeRequest.newBuilder()
  138 + JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
142 139 .setScriptIdMSB(scriptId.getMostSignificantBits())
143 140 .setScriptIdLSB(scriptId.getLeastSignificantBits())
144 141 .setFunctionName(functionName)
145   - .setScriptBody(scriptIdToBodysMap.get(scriptId)).build();
  142 + .setTimeout((int) maxRequestsTimeout)
  143 + .setScriptBody(scriptIdToBodysMap.get(scriptId));
  144 +
  145 + for (int i = 0; i < args.length; i++) {
  146 + jsRequestBuilder.setArgs(i, args[i].toString());
  147 + }
146 148
147 149 JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
148   - .setInvokeRequest(jsRequest)
  150 + .setInvokeRequest(jsRequestBuilder.build())
149 151 .build();
150 152
151 153 ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
... ...
... ... @@ -19,10 +19,10 @@ package js;
19 19 option java_package = "org.thingsboard.server.gen.js";
20 20 option java_outer_classname = "JsInvokeProtos";
21 21
22   -enum JsInvokeErrorCode{
  22 +enum JsInvokeErrorCode {
23 23 COMPILATION_ERROR = 0;
24 24 RUNTIME_ERROR = 1;
25   - CPU_USAGE_ERROR = 2;
  25 + TIMEOUT_ERROR = 2;
26 26 }
27 27
28 28 message RemoteJsRequest {
... ... @@ -69,7 +69,8 @@ message JsInvokeRequest {
69 69 int64 scriptIdLSB = 2;
70 70 string functionName = 3;
71 71 string scriptBody = 4;
72   - repeated string args = 5;
  72 + int32 timeout = 5;
  73 + repeated string args = 6;
73 74 }
74 75
75 76 message JsInvokeResponse {
... ...
... ... @@ -415,7 +415,7 @@ kafka:
415 415 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
416 416
417 417 js:
418   - evaluator: "${JS_EVALUATOR:local}" # local/external
  418 + evaluator: "${JS_EVALUATOR:local}" # local/remote
419 419 # Built-in JVM JavaScript environment properties
420 420 local:
421 421 # Use Sandboxed (secured) JVM JavaScript environment
... ... @@ -428,8 +428,6 @@ js:
428 428 max_errors: "${LOCAL_JS_SANDBOX_MAX_ERRORS:3}"
429 429 # Remote JavaScript environment properties
430 430 remote:
431   - # Use Sandboxed (secured) JVM JavaScript environment
432   - use_js_sandbox: "${USE_REMOTE_JS_SANDBOX:true}"
433 431 # JS Eval request topic
434 432 request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
435 433 # JS Eval responses topic prefix that is combined with node id
... ... @@ -437,7 +435,7 @@ js:
437 435 # JS Eval max pending requests
438 436 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
439 437 # JS Eval max request timeout
440   - max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:20000}"
  438 + max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
441 439 # JS response poll interval
442 440 response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
443 441 # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.kafka;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.apache.kafka.clients.consumer.ConsumerRecords;
20   -import org.apache.kafka.clients.producer.ProducerRecord;
21   -import org.apache.kafka.common.header.Header;
22   -
23   -import java.nio.charset.StandardCharsets;
24   -import java.util.concurrent.ExecutorService;
25   -import java.util.concurrent.Executors;
26   -import java.util.concurrent.atomic.LongAdder;
27   -
28   -/**
29   - * Created by ashvayka on 24.09.18.
30   - */
31   -@Slf4j
32   -public class TbJsEvaluator {
33   -
34   -// public static void main(String[] args) {
35   -// ExecutorService executorService = Executors.newCachedThreadPool();
36   -//
37   -// TBKafkaConsumerTemplate requestConsumer = new TBKafkaConsumerTemplate();
38   -// requestConsumer.subscribe("requests");
39   -//
40   -// LongAdder responseCounter = new LongAdder();
41   -// TBKafkaProducerTemplate responseProducer = new TBKafkaProducerTemplate();
42   -// executorService.submit((Runnable) () -> {
43   -// while (true) {
44   -// ConsumerRecords<String, String> requests = requestConsumer.poll(100);
45   -// requests.forEach(request -> {
46   -// Header header = request.headers().lastHeader("responseTopic");
47   -// ProducerRecord<String, String> response = new ProducerRecord<>(new String(header.value(), StandardCharsets.UTF_8),
48   -// request.key(), request.value());
49   -// responseProducer.send(response);
50   -// responseCounter.add(1);
51   -// });
52   -// }
53   -// });
54   -//
55   -// executorService.submit((Runnable) () -> {
56   -// while (true) {
57   -// log.warn("Requests: [{}], Responses: [{}]", responseCounter.longValue(), responseCounter.longValue());
58   -// try {
59   -// Thread.sleep(1000L);
60   -// } catch (InterruptedException e) {
61   -// e.printStackTrace();
62   -// }
63   -// }
64   -// });
65   -//
66   -// }
67   -
68   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.kafka;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.apache.kafka.clients.admin.CreateTopicsResult;
20   -import org.apache.kafka.clients.admin.NewTopic;
21   -import org.apache.kafka.clients.consumer.ConsumerRecords;
22   -import org.apache.kafka.clients.producer.ProducerRecord;
23   -import org.apache.kafka.common.header.Header;
24   -import org.apache.kafka.common.header.internals.RecordHeader;
25   -
26   -import java.nio.charset.StandardCharsets;
27   -import java.util.Collections;
28   -import java.util.List;
29   -import java.util.UUID;
30   -import java.util.concurrent.ConcurrentHashMap;
31   -import java.util.concurrent.ConcurrentMap;
32   -import java.util.concurrent.ExecutionException;
33   -import java.util.concurrent.ExecutorService;
34   -import java.util.concurrent.Executors;
35   -import java.util.concurrent.atomic.LongAdder;
36   -
37   -/**
38   - * Created by ashvayka on 24.09.18.
39   - */
40   -@Slf4j
41   -public class TbRuleEngineEmulator {
42   -//
43   -// public static void main(String[] args) throws InterruptedException, ExecutionException {
44   -// ConcurrentMap<String, String> pendingRequestsMap = new ConcurrentHashMap<>();
45   -//
46   -// ExecutorService executorService = Executors.newCachedThreadPool();
47   -//
48   -// String responseTopic = "server" + Math.abs((int) (5000.0 * Math.random()));
49   -// try {
50   -// TBKafkaAdmin admin = new TBKafkaAdmin();
51   -// CreateTopicsResult result = admin.createTopic(new NewTopic(responseTopic, 1, (short) 1));
52   -// result.all().get();
53   -// } catch (Exception e) {
54   -// log.warn("Failed to create topic: {}", e.getMessage(), e);
55   -// }
56   -//
57   -// List<Header> headers = Collections.singletonList(new RecordHeader("responseTopic", responseTopic.getBytes(StandardCharsets.UTF_8)));
58   -//
59   -// TBKafkaConsumerTemplate responseConsumer = new TBKafkaConsumerTemplate();
60   -// TBKafkaProducerTemplate requestProducer = new TBKafkaProducerTemplate();
61   -//
62   -// LongAdder requestCounter = new LongAdder();
63   -// LongAdder responseCounter = new LongAdder();
64   -//
65   -// responseConsumer.subscribe(responseTopic);
66   -// executorService.submit((Runnable) () -> {
67   -// while (true) {
68   -// ConsumerRecords<String, String> responses = responseConsumer.poll(100);
69   -// responses.forEach(response -> {
70   -// String expectedResponse = pendingRequestsMap.remove(response.key());
71   -// if (expectedResponse == null) {
72   -// log.error("[{}] Invalid request", response.key());
73   -// } else if (!expectedResponse.equals(response.value())) {
74   -// log.error("[{}] Invalid response: {} instead of {}", response.key(), response.value(), expectedResponse);
75   -// }
76   -// responseCounter.add(1);
77   -// });
78   -// }
79   -// });
80   -//
81   -// executorService.submit((Runnable) () -> {
82   -// int i = 0;
83   -// while (true) {
84   -// String requestId = UUID.randomUUID().toString();
85   -// String expectedResponse = UUID.randomUUID().toString();
86   -// pendingRequestsMap.put(requestId, expectedResponse);
87   -// requestProducer.send(new ProducerRecord<>("requests", null, requestId, expectedResponse, headers));
88   -// requestCounter.add(1);
89   -// i++;
90   -// if (i % 10000 == 0) {
91   -// try {
92   -// Thread.sleep(500L);
93   -// } catch (InterruptedException e) {
94   -// e.printStackTrace();
95   -// }
96   -// }
97   -// }
98   -// });
99   -//
100   -// executorService.submit((Runnable) () -> {
101   -// while (true) {
102   -// log.warn("Requests: [{}], Responses: [{}]", requestCounter.longValue(), responseCounter.longValue());
103   -// try {
104   -// Thread.sleep(1000L);
105   -// } catch (InterruptedException e) {
106   -// e.printStackTrace();
107   -// }
108   -// }
109   -// });
110   -//
111   -// Thread.sleep(60000);
112   -// }
113   -
114   -}