Commit 4c4923e3f7904e1a2813c9b34a0445ba7998921a

Authored by Andrew Shvayka
1 parent c0668dcb

Timeout and Args support

@@ -47,9 +47,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -47,9 +47,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
47 @Autowired 47 @Autowired
48 private TbKafkaSettings kafkaSettings; 48 private TbKafkaSettings kafkaSettings;
49 49
50 - @Value("${js.remote.use_js_sandbox}")  
51 - private boolean useJsSandbox;  
52 -  
53 @Value("${js.remote.request_topic}") 50 @Value("${js.remote.request_topic}")
54 private String requestTopic; 51 private String requestTopic;
55 52
@@ -99,8 +96,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -99,8 +96,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
99 } 96 }
100 97
101 @PreDestroy 98 @PreDestroy
102 - public void destroy(){  
103 - if(kafkaTemplate != null){ 99 + public void destroy() {
  100 + if (kafkaTemplate != null) {
104 kafkaTemplate.stop(); 101 kafkaTemplate.stop();
105 } 102 }
106 } 103 }
@@ -138,14 +135,19 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @@ -138,14 +135,19 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
138 if (scriptBody == null) { 135 if (scriptBody == null) {
139 return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!")); 136 return Futures.immediateFailedFuture(new RuntimeException("No script body found for scriptId: [" + scriptId + "]!"));
140 } 137 }
141 - JsInvokeProtos.JsInvokeRequest jsRequest = JsInvokeProtos.JsInvokeRequest.newBuilder() 138 + JsInvokeProtos.JsInvokeRequest.Builder jsRequestBuilder = JsInvokeProtos.JsInvokeRequest.newBuilder()
142 .setScriptIdMSB(scriptId.getMostSignificantBits()) 139 .setScriptIdMSB(scriptId.getMostSignificantBits())
143 .setScriptIdLSB(scriptId.getLeastSignificantBits()) 140 .setScriptIdLSB(scriptId.getLeastSignificantBits())
144 .setFunctionName(functionName) 141 .setFunctionName(functionName)
145 - .setScriptBody(scriptIdToBodysMap.get(scriptId)).build(); 142 + .setTimeout((int) maxRequestsTimeout)
  143 + .setScriptBody(scriptIdToBodysMap.get(scriptId));
  144 +
  145 + for (int i = 0; i < args.length; i++) {
  146 + jsRequestBuilder.setArgs(i, args[i].toString());
  147 + }
146 148
147 JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() 149 JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder()
148 - .setInvokeRequest(jsRequest) 150 + .setInvokeRequest(jsRequestBuilder.build())
149 .build(); 151 .build();
150 152
151 ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); 153 ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
@@ -19,10 +19,10 @@ package js; @@ -19,10 +19,10 @@ package js;
19 option java_package = "org.thingsboard.server.gen.js"; 19 option java_package = "org.thingsboard.server.gen.js";
20 option java_outer_classname = "JsInvokeProtos"; 20 option java_outer_classname = "JsInvokeProtos";
21 21
22 -enum JsInvokeErrorCode{ 22 +enum JsInvokeErrorCode {
23 COMPILATION_ERROR = 0; 23 COMPILATION_ERROR = 0;
24 RUNTIME_ERROR = 1; 24 RUNTIME_ERROR = 1;
25 - CPU_USAGE_ERROR = 2; 25 + TIMEOUT_ERROR = 2;
26 } 26 }
27 27
28 message RemoteJsRequest { 28 message RemoteJsRequest {
@@ -69,7 +69,8 @@ message JsInvokeRequest { @@ -69,7 +69,8 @@ message JsInvokeRequest {
69 int64 scriptIdLSB = 2; 69 int64 scriptIdLSB = 2;
70 string functionName = 3; 70 string functionName = 3;
71 string scriptBody = 4; 71 string scriptBody = 4;
72 - repeated string args = 5; 72 + int32 timeout = 5;
  73 + repeated string args = 6;
73 } 74 }
74 75
75 message JsInvokeResponse { 76 message JsInvokeResponse {
@@ -415,7 +415,7 @@ kafka: @@ -415,7 +415,7 @@ kafka:
415 buffer.memory: "${TB_BUFFER_MEMORY:33554432}" 415 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
416 416
417 js: 417 js:
418 - evaluator: "${JS_EVALUATOR:local}" # local/external 418 + evaluator: "${JS_EVALUATOR:local}" # local/remote
419 # Built-in JVM JavaScript environment properties 419 # Built-in JVM JavaScript environment properties
420 local: 420 local:
421 # Use Sandboxed (secured) JVM JavaScript environment 421 # Use Sandboxed (secured) JVM JavaScript environment
@@ -428,8 +428,6 @@ js: @@ -428,8 +428,6 @@ js:
428 max_errors: "${LOCAL_JS_SANDBOX_MAX_ERRORS:3}" 428 max_errors: "${LOCAL_JS_SANDBOX_MAX_ERRORS:3}"
429 # Remote JavaScript environment properties 429 # Remote JavaScript environment properties
430 remote: 430 remote:
431 - # Use Sandboxed (secured) JVM JavaScript environment  
432 - use_js_sandbox: "${USE_REMOTE_JS_SANDBOX:true}"  
433 # JS Eval request topic 431 # JS Eval request topic
434 request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" 432 request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
435 # JS Eval responses topic prefix that is combined with node id 433 # JS Eval responses topic prefix that is combined with node id
@@ -437,7 +435,7 @@ js: @@ -437,7 +435,7 @@ js:
437 # JS Eval max pending requests 435 # JS Eval max pending requests
438 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" 436 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
439 # JS Eval max request timeout 437 # JS Eval max request timeout
440 - max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:20000}" 438 + max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
441 # JS response poll interval 439 # JS response poll interval
442 response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}" 440 response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
443 # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted 441 # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.kafka;  
17 -  
18 -import lombok.extern.slf4j.Slf4j;  
19 -import org.apache.kafka.clients.consumer.ConsumerRecords;  
20 -import org.apache.kafka.clients.producer.ProducerRecord;  
21 -import org.apache.kafka.common.header.Header;  
22 -  
23 -import java.nio.charset.StandardCharsets;  
24 -import java.util.concurrent.ExecutorService;  
25 -import java.util.concurrent.Executors;  
26 -import java.util.concurrent.atomic.LongAdder;  
27 -  
28 -/**  
29 - * Created by ashvayka on 24.09.18.  
30 - */  
31 -@Slf4j  
32 -public class TbJsEvaluator {  
33 -  
34 -// public static void main(String[] args) {  
35 -// ExecutorService executorService = Executors.newCachedThreadPool();  
36 -//  
37 -// TBKafkaConsumerTemplate requestConsumer = new TBKafkaConsumerTemplate();  
38 -// requestConsumer.subscribe("requests");  
39 -//  
40 -// LongAdder responseCounter = new LongAdder();  
41 -// TBKafkaProducerTemplate responseProducer = new TBKafkaProducerTemplate();  
42 -// executorService.submit((Runnable) () -> {  
43 -// while (true) {  
44 -// ConsumerRecords<String, String> requests = requestConsumer.poll(100);  
45 -// requests.forEach(request -> {  
46 -// Header header = request.headers().lastHeader("responseTopic");  
47 -// ProducerRecord<String, String> response = new ProducerRecord<>(new String(header.value(), StandardCharsets.UTF_8),  
48 -// request.key(), request.value());  
49 -// responseProducer.send(response);  
50 -// responseCounter.add(1);  
51 -// });  
52 -// }  
53 -// });  
54 -//  
55 -// executorService.submit((Runnable) () -> {  
56 -// while (true) {  
57 -// log.warn("Requests: [{}], Responses: [{}]", responseCounter.longValue(), responseCounter.longValue());  
58 -// try {  
59 -// Thread.sleep(1000L);  
60 -// } catch (InterruptedException e) {  
61 -// e.printStackTrace();  
62 -// }  
63 -// }  
64 -// });  
65 -//  
66 -// }  
67 -  
68 -}  
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.kafka;  
17 -  
18 -import lombok.extern.slf4j.Slf4j;  
19 -import org.apache.kafka.clients.admin.CreateTopicsResult;  
20 -import org.apache.kafka.clients.admin.NewTopic;  
21 -import org.apache.kafka.clients.consumer.ConsumerRecords;  
22 -import org.apache.kafka.clients.producer.ProducerRecord;  
23 -import org.apache.kafka.common.header.Header;  
24 -import org.apache.kafka.common.header.internals.RecordHeader;  
25 -  
26 -import java.nio.charset.StandardCharsets;  
27 -import java.util.Collections;  
28 -import java.util.List;  
29 -import java.util.UUID;  
30 -import java.util.concurrent.ConcurrentHashMap;  
31 -import java.util.concurrent.ConcurrentMap;  
32 -import java.util.concurrent.ExecutionException;  
33 -import java.util.concurrent.ExecutorService;  
34 -import java.util.concurrent.Executors;  
35 -import java.util.concurrent.atomic.LongAdder;  
36 -  
37 -/**  
38 - * Created by ashvayka on 24.09.18.  
39 - */  
40 -@Slf4j  
41 -public class TbRuleEngineEmulator {  
42 -//  
43 -// public static void main(String[] args) throws InterruptedException, ExecutionException {  
44 -// ConcurrentMap<String, String> pendingRequestsMap = new ConcurrentHashMap<>();  
45 -//  
46 -// ExecutorService executorService = Executors.newCachedThreadPool();  
47 -//  
48 -// String responseTopic = "server" + Math.abs((int) (5000.0 * Math.random()));  
49 -// try {  
50 -// TBKafkaAdmin admin = new TBKafkaAdmin();  
51 -// CreateTopicsResult result = admin.createTopic(new NewTopic(responseTopic, 1, (short) 1));  
52 -// result.all().get();  
53 -// } catch (Exception e) {  
54 -// log.warn("Failed to create topic: {}", e.getMessage(), e);  
55 -// }  
56 -//  
57 -// List<Header> headers = Collections.singletonList(new RecordHeader("responseTopic", responseTopic.getBytes(StandardCharsets.UTF_8)));  
58 -//  
59 -// TBKafkaConsumerTemplate responseConsumer = new TBKafkaConsumerTemplate();  
60 -// TBKafkaProducerTemplate requestProducer = new TBKafkaProducerTemplate();  
61 -//  
62 -// LongAdder requestCounter = new LongAdder();  
63 -// LongAdder responseCounter = new LongAdder();  
64 -//  
65 -// responseConsumer.subscribe(responseTopic);  
66 -// executorService.submit((Runnable) () -> {  
67 -// while (true) {  
68 -// ConsumerRecords<String, String> responses = responseConsumer.poll(100);  
69 -// responses.forEach(response -> {  
70 -// String expectedResponse = pendingRequestsMap.remove(response.key());  
71 -// if (expectedResponse == null) {  
72 -// log.error("[{}] Invalid request", response.key());  
73 -// } else if (!expectedResponse.equals(response.value())) {  
74 -// log.error("[{}] Invalid response: {} instead of {}", response.key(), response.value(), expectedResponse);  
75 -// }  
76 -// responseCounter.add(1);  
77 -// });  
78 -// }  
79 -// });  
80 -//  
81 -// executorService.submit((Runnable) () -> {  
82 -// int i = 0;  
83 -// while (true) {  
84 -// String requestId = UUID.randomUUID().toString();  
85 -// String expectedResponse = UUID.randomUUID().toString();  
86 -// pendingRequestsMap.put(requestId, expectedResponse);  
87 -// requestProducer.send(new ProducerRecord<>("requests", null, requestId, expectedResponse, headers));  
88 -// requestCounter.add(1);  
89 -// i++;  
90 -// if (i % 10000 == 0) {  
91 -// try {  
92 -// Thread.sleep(500L);  
93 -// } catch (InterruptedException e) {  
94 -// e.printStackTrace();  
95 -// }  
96 -// }  
97 -// }  
98 -// });  
99 -//  
100 -// executorService.submit((Runnable) () -> {  
101 -// while (true) {  
102 -// log.warn("Requests: [{}], Responses: [{}]", requestCounter.longValue(), responseCounter.longValue());  
103 -// try {  
104 -// Thread.sleep(1000L);  
105 -// } catch (InterruptedException e) {  
106 -// e.printStackTrace();  
107 -// }  
108 -// }  
109 -// });  
110 -//  
111 -// Thread.sleep(60000);  
112 -// }  
113 -  
114 -}