Commit 8915b279a027b16c1d45d2e9728ee5dcf474fa7d

Authored by Sergey Matvienko
1 parent 585f473b

js-script-engine-api: executeSwitch replaced with asynchronous executeSwitchAsync

@@ -400,7 +400,7 @@ public class RuleChainController extends BaseController { @@ -400,7 +400,7 @@ public class RuleChainController extends BaseController {
400 output = Boolean.toString(result); 400 output = Boolean.toString(result);
401 break; 401 break;
402 case "switch": 402 case "switch":
403 - Set<String> states = engine.executeSwitch(inMsg); 403 + Set<String> states = engine.executeSwitchAsync(inMsg).get(TIMEOUT, TimeUnit.SECONDS);
404 output = objectMapper.writeValueAsString(states); 404 output = objectMapper.writeValueAsString(states);
405 break; 405 break;
406 case "json": 406 case "json":
@@ -195,9 +195,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S @@ -195,9 +195,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
195 }, MoreExecutors.directExecutor()); 195 }, MoreExecutors.directExecutor());
196 } 196 }
197 197
198 - @Override  
199 - public Set<String> executeSwitch(TbMsg msg) throws ScriptException {  
200 - JsonNode result = executeScript(msg); 198 + Set<String> executeSwitchPostProcessFunction(JsonNode result) throws ScriptException {
201 if (result.isTextual()) { 199 if (result.isTextual()) {
202 return Collections.singleton(result.asText()); 200 return Collections.singleton(result.asText());
203 } else if (result.isArray()) { 201 } else if (result.isArray()) {
@@ -217,6 +215,14 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S @@ -217,6 +215,14 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
217 } 215 }
218 } 216 }
219 217
  218 + @Override
  219 + public ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg) {
  220 + log.trace("execute switch async, msg {}", msg);
  221 + return Futures.transformAsync(executeScriptAsync(msg),
  222 + result -> Futures.immediateFuture(executeSwitchPostProcessFunction(result)),
  223 + MoreExecutors.directExecutor()); //usually runs in a callbackExecutor
  224 + }
  225 +
220 private JsonNode executeScript(TbMsg msg) throws ScriptException { 226 private JsonNode executeScript(TbMsg msg) throws ScriptException {
221 try { 227 try {
222 String[] inArgs = prepareArgs(msg); 228 String[] inArgs = prepareArgs(msg);
@@ -35,7 +35,7 @@ public interface ScriptEngine { @@ -35,7 +35,7 @@ public interface ScriptEngine {
35 35
36 ListenableFuture<Boolean> executeFilterAsync(TbMsg msg); 36 ListenableFuture<Boolean> executeFilterAsync(TbMsg msg);
37 37
38 - Set<String> executeSwitch(TbMsg msg) throws ScriptException; 38 + ListenableFuture<Set<String>> executeSwitchAsync(TbMsg msg);
39 39
40 JsonNode executeJson(TbMsg msg) throws ScriptException; 40 JsonNode executeJson(TbMsg msg) throws ScriptException;
41 41
@@ -15,7 +15,11 @@ @@ -15,7 +15,11 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.filter; 16 package org.thingsboard.rule.engine.filter;
17 17
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.MoreExecutors;
18 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
  22 +import org.checkerframework.checker.nullness.qual.Nullable;
19 import org.thingsboard.common.util.ListeningExecutor; 23 import org.thingsboard.common.util.ListeningExecutor;
20 import org.thingsboard.rule.engine.api.RuleNode; 24 import org.thingsboard.rule.engine.api.RuleNode;
21 import org.thingsboard.rule.engine.api.ScriptEngine; 25 import org.thingsboard.rule.engine.api.ScriptEngine;
@@ -58,17 +62,20 @@ public class TbJsSwitchNode implements TbNode { @@ -58,17 +62,20 @@ public class TbJsSwitchNode implements TbNode {
58 62
59 @Override 63 @Override
60 public void onMsg(TbContext ctx, TbMsg msg) { 64 public void onMsg(TbContext ctx, TbMsg msg) {
61 - ListeningExecutor jsExecutor = ctx.getJsExecutor();  
62 ctx.logJsEvalRequest(); 65 ctx.logJsEvalRequest();
63 - withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)),  
64 - result -> {  
65 - ctx.logJsEvalResponse();  
66 - processSwitch(ctx, msg, result);  
67 - },  
68 - t -> {  
69 - ctx.logJsEvalFailure();  
70 - ctx.tellFailure(msg, t);  
71 - }, ctx.getDbCallbackExecutor()); 66 + Futures.addCallback(jsEngine.executeSwitchAsync(msg), new FutureCallback<Set<String>>() {
  67 + @Override
  68 + public void onSuccess(@Nullable Set<String> result) {
  69 + ctx.logJsEvalResponse();
  70 + processSwitch(ctx, msg, result);
  71 + }
  72 +
  73 + @Override
  74 + public void onFailure(Throwable t) {
  75 + ctx.logJsEvalFailure();
  76 + ctx.tellFailure(msg, t);
  77 + }
  78 + }, MoreExecutors.directExecutor()); //usually runs in a callbackExecutor
72 } 79 }
73 80
74 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) { 81 private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) {
@@ -73,7 +73,7 @@ public class TbJsSwitchNodeTest { @@ -73,7 +73,7 @@ public class TbJsSwitchNodeTest {
73 73
74 TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson, ruleChainId, ruleNodeId); 74 TbMsg msg = TbMsg.newMsg( "USER", null, metaData, TbMsgDataType.JSON, rawJson, ruleChainId, ruleNodeId);
75 mockJsExecutor(); 75 mockJsExecutor();
76 - when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three")); 76 + when(scriptEngine.executeSwitchAsync(msg)).thenReturn(Futures.immediateFuture(Sets.newHashSet("one", "three")));
77 77
78 node.onMsg(ctx, msg); 78 node.onMsg(ctx, msg);
79 verify(ctx).getJsExecutor(); 79 verify(ctx).getJsExecutor();