Commit da7da1ef76ce84e49f8169554cb9d984d099e64f

Authored by Andrew Shvayka
1 parent 88434dec

Async JS Execution

@@ -140,6 +140,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S @@ -140,6 +140,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
140 } 140 }
141 141
142 @Override 142 @Override
  143 + public ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) throws ScriptException {
  144 + return executeScriptAsync(msg);
  145 + }
  146 +
  147 + @Override
143 public String executeToString(TbMsg msg) throws ScriptException { 148 public String executeToString(TbMsg msg) throws ScriptException {
144 JsonNode result = executeScript(msg); 149 JsonNode result = executeScript(msg);
145 if (!result.isTextual()) { 150 if (!result.isTextual()) {
@@ -38,6 +38,8 @@ public interface ScriptEngine { @@ -38,6 +38,8 @@ public interface ScriptEngine {
38 38
39 JsonNode executeJson(TbMsg msg) throws ScriptException; 39 JsonNode executeJson(TbMsg msg) throws ScriptException;
40 40
  41 + ListenableFuture<JsonNode> executeJsonAsync(TbMsg msg) throws ScriptException;
  42 +
41 String executeToString(TbMsg msg) throws ScriptException; 43 String executeToString(TbMsg msg) throws ScriptException;
42 44
43 void destroy(); 45 void destroy();
@@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action; @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action;
17 17
18 import com.fasterxml.jackson.databind.JsonNode; 18 import com.fasterxml.jackson.databind.JsonNode;
19 import com.fasterxml.jackson.databind.ObjectMapper; 19 import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
21 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
22 import org.thingsboard.rule.engine.api.ScriptEngine; 23 import org.thingsboard.rule.engine.api.ScriptEngine;
@@ -27,6 +28,9 @@ import org.thingsboard.rule.engine.api.TbNodeException; @@ -27,6 +28,9 @@ import org.thingsboard.rule.engine.api.TbNodeException;
27 import org.thingsboard.server.common.data.alarm.Alarm; 28 import org.thingsboard.server.common.data.alarm.Alarm;
28 import org.thingsboard.server.common.msg.TbMsg; 29 import org.thingsboard.server.common.msg.TbMsg;
29 import org.thingsboard.server.common.msg.TbMsgMetaData; 30 import org.thingsboard.server.common.msg.TbMsgMetaData;
  31 +
  32 +import javax.script.ScriptException;
  33 +
30 import static org.thingsboard.common.util.DonAsynchron.withCallback; 34 import static org.thingsboard.common.util.DonAsynchron.withCallback;
31 35
32 36
@@ -67,21 +71,24 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura @@ -67,21 +71,24 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura
67 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); 71 ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
68 } 72 }
69 }, 73 },
70 - t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); 74 + t -> ctx.tellFailure(msg, t)
  75 + , ctx.getDbCallbackExecutor());
71 } 76 }
72 77
73 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg); 78 protected abstract ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg);
74 79
75 protected ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg, JsonNode previousDetails) { 80 protected ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg, JsonNode previousDetails) {
76 - return ctx.getJsExecutor().executeAsync(() -> { 81 + try {
77 TbMsg dummyMsg = msg; 82 TbMsg dummyMsg = msg;
78 if (previousDetails != null) { 83 if (previousDetails != null) {
79 TbMsgMetaData metaData = msg.getMetaData().copy(); 84 TbMsgMetaData metaData = msg.getMetaData().copy();
80 metaData.putValue(PREV_ALARM_DETAILS, mapper.writeValueAsString(previousDetails)); 85 metaData.putValue(PREV_ALARM_DETAILS, mapper.writeValueAsString(previousDetails));
81 dummyMsg = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), metaData, msg.getData()); 86 dummyMsg = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), metaData, msg.getData());
82 } 87 }
83 - return buildDetailsJsEngine.executeJson(dummyMsg);  
84 - }); 88 + return buildDetailsJsEngine.executeJsonAsync(dummyMsg);
  89 + } catch (Exception e) {
  90 + return Futures.immediateFailedFuture(e);
  91 + }
85 } 92 }
86 93
87 private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) { 94 private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) {
@@ -26,6 +26,7 @@ import org.junit.Test; @@ -26,6 +26,7 @@ import org.junit.Test;
26 import org.junit.runner.RunWith; 26 import org.junit.runner.RunWith;
27 import org.mockito.ArgumentCaptor; 27 import org.mockito.ArgumentCaptor;
28 import org.mockito.Mock; 28 import org.mockito.Mock;
  29 +import org.mockito.Mockito;
29 import org.mockito.runners.MockitoJUnitRunner; 30 import org.mockito.runners.MockitoJUnitRunner;
30 import org.mockito.stubbing.Answer; 31 import org.mockito.stubbing.Answer;
31 import org.thingsboard.common.util.ListeningExecutor; 32 import org.thingsboard.common.util.ListeningExecutor;
@@ -60,8 +61,6 @@ public class TbAlarmNodeTest { @@ -60,8 +61,6 @@ public class TbAlarmNodeTest {
60 @Mock 61 @Mock
61 private TbContext ctx; 62 private TbContext ctx;
62 @Mock 63 @Mock
63 - private ListeningExecutor executor;  
64 - @Mock  
65 private AlarmService alarmService; 64 private AlarmService alarmService;
66 65
67 @Mock 66 @Mock
@@ -102,7 +101,7 @@ public class TbAlarmNodeTest { @@ -102,7 +101,7 @@ public class TbAlarmNodeTest {
102 metaData.putValue("key", "value"); 101 metaData.putValue("key", "value");
103 TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L); 102 TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
104 103
105 - when(detailsJs.executeJson(msg)).thenReturn(null); 104 + when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
106 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null)); 105 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null));
107 106
108 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class)); 107 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class));
@@ -136,8 +135,6 @@ public class TbAlarmNodeTest { @@ -136,8 +135,6 @@ public class TbAlarmNodeTest {
136 .build(); 135 .build();
137 136
138 assertEquals(expectedAlarm, actualAlarm); 137 assertEquals(expectedAlarm, actualAlarm);
139 -  
140 - verify(executor, times(1)).executeAsync(any(Callable.class));  
141 } 138 }
142 139
143 @Test 140 @Test
@@ -146,7 +143,7 @@ public class TbAlarmNodeTest { @@ -146,7 +143,7 @@ public class TbAlarmNodeTest {
146 metaData.putValue("key", "value"); 143 metaData.putValue("key", "value");
147 TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L); 144 TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
148 145
149 - when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message")); 146 + when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFailedFuture(new NotImplementedException("message")));
150 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null)); 147 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null));
151 148
152 node.onMsg(ctx, msg); 149 node.onMsg(ctx, msg);
@@ -154,7 +151,6 @@ public class TbAlarmNodeTest { @@ -154,7 +151,6 @@ public class TbAlarmNodeTest {
154 verifyError(msg, "message", NotImplementedException.class); 151 verifyError(msg, "message", NotImplementedException.class);
155 152
156 verify(ctx).createJsScriptEngine("DETAILS"); 153 verify(ctx).createJsScriptEngine("DETAILS");
157 - verify(ctx, times(1)).getJsExecutor();  
158 verify(ctx).getAlarmService(); 154 verify(ctx).getAlarmService();
159 verify(ctx, times(3)).getDbCallbackExecutor(); 155 verify(ctx, times(3)).getDbCallbackExecutor();
160 verify(ctx).logJsEvalRequest(); 156 verify(ctx).logJsEvalRequest();
@@ -172,7 +168,7 @@ public class TbAlarmNodeTest { @@ -172,7 +168,7 @@ public class TbAlarmNodeTest {
172 168
173 Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build(); 169 Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build();
174 170
175 - when(detailsJs.executeJson(msg)).thenReturn(null); 171 + when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
176 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(clearedAlarm)); 172 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(clearedAlarm));
177 173
178 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class)); 174 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class));
@@ -207,8 +203,6 @@ public class TbAlarmNodeTest { @@ -207,8 +203,6 @@ public class TbAlarmNodeTest {
207 .build(); 203 .build();
208 204
209 assertEquals(expectedAlarm, actualAlarm); 205 assertEquals(expectedAlarm, actualAlarm);
210 -  
211 - verify(executor, times(1)).executeAsync(any(Callable.class));  
212 } 206 }
213 207
214 @Test 208 @Test
@@ -220,7 +214,7 @@ public class TbAlarmNodeTest { @@ -220,7 +214,7 @@ public class TbAlarmNodeTest {
220 long oldEndDate = System.currentTimeMillis(); 214 long oldEndDate = System.currentTimeMillis();
221 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); 215 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
222 216
223 - when(detailsJs.executeJson(msg)).thenReturn(null); 217 + when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
224 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); 218 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm));
225 219
226 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm); 220 doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm);
@@ -256,8 +250,6 @@ public class TbAlarmNodeTest { @@ -256,8 +250,6 @@ public class TbAlarmNodeTest {
256 .build(); 250 .build();
257 251
258 assertEquals(expectedAlarm, actualAlarm); 252 assertEquals(expectedAlarm, actualAlarm);
259 -  
260 - verify(executor, times(1)).executeAsync(any(Callable.class));  
261 } 253 }
262 254
263 @Test 255 @Test
@@ -269,7 +261,7 @@ public class TbAlarmNodeTest { @@ -269,7 +261,7 @@ public class TbAlarmNodeTest {
269 long oldEndDate = System.currentTimeMillis(); 261 long oldEndDate = System.currentTimeMillis();
270 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); 262 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
271 263
272 -// when(detailsJs.executeJson(msg)).thenReturn(null); 264 + when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
273 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm)); 265 when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(activeAlarm));
274 when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())).thenReturn(Futures.immediateFuture(true)); 266 when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())).thenReturn(Futures.immediateFuture(true));
275 when(alarmService.findAlarmByIdAsync(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()))).thenReturn(Futures.immediateFuture(activeAlarm)); 267 when(alarmService.findAlarmByIdAsync(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()))).thenReturn(Futures.immediateFuture(activeAlarm));
@@ -320,12 +312,9 @@ public class TbAlarmNodeTest { @@ -320,12 +312,9 @@ public class TbAlarmNodeTest {
320 when(ctx.createJsScriptEngine("DETAILS")).thenReturn(detailsJs); 312 when(ctx.createJsScriptEngine("DETAILS")).thenReturn(detailsJs);
321 313
322 when(ctx.getTenantId()).thenReturn(tenantId); 314 when(ctx.getTenantId()).thenReturn(tenantId);
323 - when(ctx.getJsExecutor()).thenReturn(executor);  
324 when(ctx.getAlarmService()).thenReturn(alarmService); 315 when(ctx.getAlarmService()).thenReturn(alarmService);
325 when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); 316 when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
326 317
327 - mockJsExecutor();  
328 -  
329 node = new TbCreateAlarmNode(); 318 node = new TbCreateAlarmNode();
330 node.init(ctx, nodeConfiguration); 319 node.init(ctx, nodeConfiguration);
331 } catch (TbNodeException ex) { 320 } catch (TbNodeException ex) {
@@ -344,12 +333,9 @@ public class TbAlarmNodeTest { @@ -344,12 +333,9 @@ public class TbAlarmNodeTest {
344 when(ctx.createJsScriptEngine("DETAILS")).thenReturn(detailsJs); 333 when(ctx.createJsScriptEngine("DETAILS")).thenReturn(detailsJs);
345 334
346 when(ctx.getTenantId()).thenReturn(tenantId); 335 when(ctx.getTenantId()).thenReturn(tenantId);
347 - when(ctx.getJsExecutor()).thenReturn(executor);  
348 when(ctx.getAlarmService()).thenReturn(alarmService); 336 when(ctx.getAlarmService()).thenReturn(alarmService);
349 when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); 337 when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
350 338
351 - mockJsExecutor();  
352 -  
353 node = new TbClearAlarmNode(); 339 node = new TbClearAlarmNode();
354 node.init(ctx, nodeConfiguration); 340 node.init(ctx, nodeConfiguration);
355 } catch (TbNodeException ex) { 341 } catch (TbNodeException ex) {
@@ -357,18 +343,6 @@ public class TbAlarmNodeTest { @@ -357,18 +343,6 @@ public class TbAlarmNodeTest {
357 } 343 }
358 } 344 }
359 345
360 - private void mockJsExecutor() {  
361 - when(ctx.getJsExecutor()).thenReturn(executor);  
362 - doAnswer((Answer<ListenableFuture<Boolean>>) invocationOnMock -> {  
363 - try {  
364 - Callable task = (Callable) (invocationOnMock.getArguments())[0];  
365 - return Futures.immediateFuture((Boolean) task.call());  
366 - } catch (Throwable th) {  
367 - return Futures.immediateFailedFuture(th);  
368 - }  
369 - }).when(executor).executeAsync(any(Callable.class));  
370 - }  
371 -  
372 private void verifyError(TbMsg msg, String message, Class expectedClass) { 346 private void verifyError(TbMsg msg, String message, Class expectedClass) {
373 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 347 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
374 verify(ctx).tellFailure(same(msg), captor.capture()); 348 verify(ctx).tellFailure(same(msg), captor.capture());