Commit b46d6b036c890358e77ff9ff0c9b2f0c5b97045a

Authored by Igor Kulikov
2 parents dba1c026 8c91b7fb

Merge branch 'master' of github.com:thingsboard/thingsboard

... ... @@ -154,7 +154,7 @@ class DefaultTbContext implements TbContext {
154 154
155 155 @Override
156 156 public ScriptEngine createJsScriptEngine(String script, String... argNames) {
157   - return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), script, argNames);
  157 + return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames);
158 158 }
159 159
160 160 @Override
... ...
... ... @@ -276,7 +276,7 @@ public class RuleChainController extends BaseController {
276 276 String errorText = "";
277 277 ScriptEngine engine = null;
278 278 try {
279   - engine = new RuleNodeJsScriptEngine(jsSandboxService, script, argNames);
  279 + engine = new RuleNodeJsScriptEngine(jsSandboxService, getCurrentUser().getId(), script, argNames);
280 280 TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
281 281 switch (scriptType) {
282 282 case "update":
... ...
... ... @@ -13,7 +13,6 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -
17 16 package org.thingsboard.server.service.script;
18 17
19 18 import com.google.common.util.concurrent.Futures;
... ... @@ -21,8 +20,12 @@ import com.google.common.util.concurrent.ListenableFuture;
21 20 import delight.nashornsandbox.NashornSandbox;
22 21 import delight.nashornsandbox.NashornSandboxes;
23 22 import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
  23 +import lombok.Data;
  24 +import lombok.EqualsAndHashCode;
  25 +import lombok.Getter;
  26 +import lombok.RequiredArgsConstructor;
24 27 import lombok.extern.slf4j.Slf4j;
25   -import org.apache.commons.lang3.tuple.Pair;
  28 +import org.thingsboard.server.common.data.id.EntityId;
26 29
27 30 import javax.annotation.PostConstruct;
28 31 import javax.annotation.PreDestroy;
... ... @@ -44,9 +47,10 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
44 47 private ExecutorService monitorExecutorService;
45 48
46 49 private final Map<UUID, String> functionsMap = new ConcurrentHashMap<>();
47   - private final Map<UUID,AtomicInteger> blackListedFunctions = new ConcurrentHashMap<>();
48   - private final Map<String, Pair<UUID, AtomicInteger>> scriptToId = new ConcurrentHashMap<>();
49   - private final Map<UUID, AtomicInteger> scriptIdToCount = new ConcurrentHashMap<>();
  50 + private final Map<BlackListKey, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
  51 +
  52 + private final Map<String, ScriptInfo> scriptKeyToInfo = new ConcurrentHashMap<>();
  53 + private final Map<UUID, ScriptInfo> scriptIdToInfo = new ConcurrentHashMap<>();
50 54
51 55 @PostConstruct
52 56 public void init() {
... ... @@ -65,7 +69,7 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
65 69
66 70 @PreDestroy
67 71 public void stop() {
68   - if (monitorExecutorService != null) {
  72 + if (monitorExecutorService != null) {
69 73 monitorExecutorService.shutdownNow();
70 74 }
71 75 }
... ... @@ -80,90 +84,107 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
80 84
81 85 @Override
82 86 public ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames) {
83   - Pair<UUID, AtomicInteger> deduplicated = deduplicate(scriptType, scriptBody);
84   - UUID scriptId = deduplicated.getLeft();
85   - AtomicInteger duplicateCount = deduplicated.getRight();
86   -
87   - if(duplicateCount.compareAndSet(0, 1)) {
88   - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
89   - String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
90   - try {
91   - if (useJsSandbox()) {
92   - sandbox.eval(jsScript);
93   - } else {
94   - engine.eval(jsScript);
  87 + ScriptInfo scriptInfo = deduplicate(scriptType, scriptBody);
  88 + UUID scriptId = scriptInfo.getId();
  89 + AtomicInteger duplicateCount = scriptInfo.getCount();
  90 +
  91 + synchronized (scriptInfo.getLock()) {
  92 + if (duplicateCount.compareAndSet(0, 1)) {
  93 + try {
  94 + evaluate(scriptId, scriptType, scriptBody, argNames);
  95 + } catch (Exception e) {
  96 + duplicateCount.decrementAndGet();
  97 + log.warn("Failed to compile JS script: {}", e.getMessage(), e);
  98 + return Futures.immediateFailedFuture(e);
95 99 }
96   - functionsMap.put(scriptId, functionName);
97   - } catch (Exception e) {
98   - duplicateCount.decrementAndGet();
99   - log.warn("Failed to compile JS script: {}", e.getMessage(), e);
100   - return Futures.immediateFailedFuture(e);
  100 + } else {
  101 + duplicateCount.incrementAndGet();
101 102 }
102   - } else {
103   - duplicateCount.incrementAndGet();
104 103 }
105 104 return Futures.immediateFuture(scriptId);
106 105 }
107 106
  107 + private void evaluate(UUID scriptId, JsScriptType scriptType, String scriptBody, String... argNames) throws ScriptException {
  108 + String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
  109 + String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
  110 + if (useJsSandbox()) {
  111 + sandbox.eval(jsScript);
  112 + } else {
  113 + engine.eval(jsScript);
  114 + }
  115 + functionsMap.put(scriptId, functionName);
  116 + }
  117 +
108 118 @Override
109   - public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) {
  119 + public ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args) {
110 120 String functionName = functionsMap.get(scriptId);
111 121 if (functionName == null) {
112   - return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
113   - }
114   - if (!isBlackListed(scriptId)) {
115   - try {
116   - Object result;
117   - if (useJsSandbox()) {
118   - result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
119   - } else {
120   - result = ((Invocable)engine).invokeFunction(functionName, args);
121   - }
122   - return Futures.immediateFuture(result);
123   - } catch (Exception e) {
124   - blackListedFunctions.computeIfAbsent(scriptId, key -> new AtomicInteger(0)).incrementAndGet();
125   - return Futures.immediateFailedFuture(e);
126   - }
  122 + String message = "No compiled script found for scriptId: [" + scriptId + "]!";
  123 + log.warn(message);
  124 + return Futures.immediateFailedFuture(new RuntimeException(message));
  125 + }
  126 +
  127 + BlackListInfo blackListInfo = blackListedFunctions.get(new BlackListKey(scriptId, entityId));
  128 + if (blackListInfo != null && blackListInfo.getCount() >= getMaxErrors()) {
  129 + RuntimeException throwable = new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!", blackListInfo.getCause());
  130 + throwable.printStackTrace();
  131 + return Futures.immediateFailedFuture(throwable);
  132 + }
  133 +
  134 + try {
  135 + return invoke(functionName, args);
  136 + } catch (Exception e) {
  137 + BlackListKey blackListKey = new BlackListKey(scriptId, entityId);
  138 + blackListedFunctions.computeIfAbsent(blackListKey, key -> new BlackListInfo()).incrementWithReason(e);
  139 + return Futures.immediateFailedFuture(e);
  140 + }
  141 + }
  142 +
  143 + private ListenableFuture<Object> invoke(String functionName, Object... args) throws ScriptException, NoSuchMethodException {
  144 + Object result;
  145 + if (useJsSandbox()) {
  146 + result = sandbox.getSandboxedInvocable().invokeFunction(functionName, args);
127 147 } else {
128   - return Futures.immediateFailedFuture(
129   - new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
  148 + result = ((Invocable) engine).invokeFunction(functionName, args);
130 149 }
  150 + return Futures.immediateFuture(result);
131 151 }
132 152
133 153 @Override
134   - public ListenableFuture<Void> release(UUID scriptId) {
135   - AtomicInteger count = scriptIdToCount.get(scriptId);
136   - if(count != null) {
137   - if(count.decrementAndGet() > 0) {
  154 + public ListenableFuture<Void> release(UUID scriptId, EntityId entityId) {
  155 + ScriptInfo scriptInfo = scriptIdToInfo.get(scriptId);
  156 + if (scriptInfo == null) {
  157 + log.warn("Script release called for not existing script id [{}]", scriptId);
  158 + return Futures.immediateFuture(null);
  159 + }
  160 +
  161 + synchronized (scriptInfo.getLock()) {
  162 + int remainingDuplicates = scriptInfo.getCount().decrementAndGet();
  163 + if (remainingDuplicates > 0) {
138 164 return Futures.immediateFuture(null);
139 165 }
140   - }
141 166
142   - String functionName = functionsMap.get(scriptId);
143   - if (functionName != null) {
144   - try {
145   - if (useJsSandbox()) {
146   - sandbox.eval(functionName + " = undefined;");
147   - } else {
148   - engine.eval(functionName + " = undefined;");
  167 + String functionName = functionsMap.get(scriptId);
  168 + if (functionName != null) {
  169 + try {
  170 + if (useJsSandbox()) {
  171 + sandbox.eval(functionName + " = undefined;");
  172 + } else {
  173 + engine.eval(functionName + " = undefined;");
  174 + }
  175 + functionsMap.remove(scriptId);
  176 + blackListedFunctions.remove(new BlackListKey(scriptId, entityId));
  177 + } catch (ScriptException e) {
  178 + log.error("Could not release script [{}] [{}]", scriptId, remainingDuplicates);
  179 + return Futures.immediateFailedFuture(e);
149 180 }
150   - functionsMap.remove(scriptId);
151   - blackListedFunctions.remove(scriptId);
152   - } catch (ScriptException e) {
153   - return Futures.immediateFailedFuture(e);
  181 + } else {
  182 + log.warn("Function name do not exist for script [{}] [{}]", scriptId, remainingDuplicates);
154 183 }
155 184 }
156 185 return Futures.immediateFuture(null);
157 186 }
158 187
159   - private boolean isBlackListed(UUID scriptId) {
160   - if (blackListedFunctions.containsKey(scriptId)) {
161   - AtomicInteger errorCount = blackListedFunctions.get(scriptId);
162   - return errorCount.get() >= getMaxErrors();
163   - } else {
164   - return false;
165   - }
166   - }
167 188
168 189 private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
169 190 switch (scriptType) {
... ... @@ -174,15 +195,66 @@ public abstract class AbstractNashornJsSandboxService implements JsSandboxServic
174 195 }
175 196 }
176 197
177   - private Pair<UUID, AtomicInteger> deduplicate(JsScriptType scriptType, String scriptBody) {
178   - Pair<UUID, AtomicInteger> precomputed = Pair.of(UUID.randomUUID(), new AtomicInteger());
179   -
180   - Pair<UUID, AtomicInteger> pair = scriptToId.computeIfAbsent(deduplicateKey(scriptType, scriptBody), i -> precomputed);
181   - AtomicInteger duplicateCount = scriptIdToCount.computeIfAbsent(pair.getLeft(), i -> pair.getRight());
182   - return Pair.of(pair.getLeft(), duplicateCount);
  198 + private ScriptInfo deduplicate(JsScriptType scriptType, String scriptBody) {
  199 + ScriptInfo meta = ScriptInfo.preInit();
  200 + String key = deduplicateKey(scriptType, scriptBody);
  201 + ScriptInfo latestMeta = scriptKeyToInfo.computeIfAbsent(key, i -> meta);
  202 + return scriptIdToInfo.computeIfAbsent(latestMeta.getId(), i -> latestMeta);
183 203 }
184 204
185 205 private String deduplicateKey(JsScriptType scriptType, String scriptBody) {
186 206 return scriptType + "_" + scriptBody;
187 207 }
  208 +
  209 + @Getter
  210 + private static class ScriptInfo {
  211 + private final UUID id;
  212 + private final Object lock;
  213 + private final AtomicInteger count;
  214 +
  215 + ScriptInfo(UUID id, Object lock, AtomicInteger count) {
  216 + this.id = id;
  217 + this.lock = lock;
  218 + this.count = count;
  219 + }
  220 +
  221 + static ScriptInfo preInit() {
  222 + UUID preId = UUID.randomUUID();
  223 + AtomicInteger preCount = new AtomicInteger();
  224 + Object preLock = new Object();
  225 + return new ScriptInfo(preId, preLock, preCount);
  226 + }
  227 + }
  228 +
  229 + @EqualsAndHashCode
  230 + @Getter
  231 + @RequiredArgsConstructor
  232 + private static class BlackListKey {
  233 + private final UUID scriptId;
  234 + private final EntityId entityId;
  235 +
  236 + }
  237 +
  238 + @Data
  239 + private static class BlackListInfo {
  240 + private final AtomicInteger count;
  241 + private Exception ex;
  242 +
  243 + BlackListInfo() {
  244 + this.count = new AtomicInteger(0);
  245 + }
  246 +
  247 + void incrementWithReason(Exception e) {
  248 + count.incrementAndGet();
  249 + ex = e;
  250 + }
  251 +
  252 + int getCount() {
  253 + return count.get();
  254 + }
  255 +
  256 + Exception getCause() {
  257 + return ex;
  258 + }
  259 + }
188 260 }
... ...
... ... @@ -17,6 +17,7 @@
17 17 package org.thingsboard.server.service.script;
18 18
19 19 import com.google.common.util.concurrent.ListenableFuture;
  20 +import org.thingsboard.server.common.data.id.EntityId;
20 21
21 22 import java.util.UUID;
22 23
... ... @@ -24,8 +25,8 @@ public interface JsSandboxService {
24 25
25 26 ListenableFuture<UUID> eval(JsScriptType scriptType, String scriptBody, String... argNames);
26 27
27   - ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args);
  28 + ListenableFuture<Object> invokeFunction(UUID scriptId, EntityId entityId, Object... args);
28 29
29   - ListenableFuture<Void> release(UUID scriptId);
  30 + ListenableFuture<Void> release(UUID scriptId, EntityId entityId);
30 31
31 32 }
... ...
... ... @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
21 21 import com.google.common.collect.Sets;
22 22 import lombok.extern.slf4j.Slf4j;
23 23 import org.apache.commons.lang3.StringUtils;
  24 +import org.thingsboard.server.common.data.id.EntityId;
24 25 import org.thingsboard.server.common.msg.TbMsg;
25 26 import org.thingsboard.server.common.msg.TbMsgMetaData;
26 27
... ... @@ -39,9 +40,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
39 40 private final JsSandboxService sandboxService;
40 41
41 42 private final UUID scriptId;
  43 + private final EntityId entityId;
42 44
43   - public RuleNodeJsScriptEngine(JsSandboxService sandboxService, String script, String... argNames) {
  45 + public RuleNodeJsScriptEngine(JsSandboxService sandboxService, EntityId entityId, String script, String... argNames) {
44 46 this.sandboxService = sandboxService;
  47 + this.entityId = entityId;
45 48 try {
46 49 this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get();
47 50 } catch (Exception e) {
... ... @@ -162,20 +165,20 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
162 165 private JsonNode executeScript(TbMsg msg) throws ScriptException {
163 166 try {
164 167 String[] inArgs = prepareArgs(msg);
165   - String eval = sandboxService.invokeFunction(this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
  168 + String eval = sandboxService.invokeFunction(this.scriptId, this.entityId, inArgs[0], inArgs[1], inArgs[2]).get().toString();
166 169 return mapper.readTree(eval);
167 170 } catch (ExecutionException e) {
168 171 if (e.getCause() instanceof ScriptException) {
169 172 throw (ScriptException)e.getCause();
170 173 } else {
171   - throw new ScriptException("Failed to execute js script: " + e.getMessage());
  174 + throw new ScriptException(e);
172 175 }
173 176 } catch (Exception e) {
174   - throw new ScriptException("Failed to execute js script: " + e.getMessage());
  177 + throw new ScriptException(e);
175 178 }
176 179 }
177 180
178 181 public void destroy() {
179   - sandboxService.release(this.scriptId);
  182 + sandboxService.release(this.scriptId, this.entityId);
180 183 }
181 184 }
... ...
... ... @@ -17,7 +17,7 @@
17 17
18 18 -->
19 19 <!DOCTYPE configuration>
20   -<configuration>
  20 +<configuration scan="true" scanPeriod="10 seconds">
21 21
22 22 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
23 23 <encoder>
... ...
... ... @@ -21,12 +21,18 @@ import org.junit.After;
21 21 import org.junit.Before;
22 22 import org.junit.Test;
23 23 import org.thingsboard.rule.engine.api.ScriptEngine;
  24 +import org.thingsboard.server.common.data.id.EntityId;
  25 +import org.thingsboard.server.common.data.id.RuleNodeId;
24 26 import org.thingsboard.server.common.msg.TbMsg;
25 27 import org.thingsboard.server.common.msg.TbMsgMetaData;
26 28
27 29 import javax.script.ScriptException;
28   -
  30 +import java.util.Map;
29 31 import java.util.Set;
  32 +import java.util.UUID;
  33 +import java.util.concurrent.*;
  34 +import java.util.concurrent.atomic.AtomicBoolean;
  35 +import java.util.concurrent.atomic.AtomicInteger;
30 36
31 37 import static org.junit.Assert.*;
32 38
... ... @@ -35,6 +41,8 @@ public class RuleNodeJsScriptEngineTest {
35 41 private ScriptEngine scriptEngine;
36 42 private TestNashornJsSandboxService jsSandboxService;
37 43
  44 + private EntityId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  45 +
38 46 @Before
39 47 public void beforeTest() throws Exception {
40 48 jsSandboxService = new TestNashornJsSandboxService(false, 1, 100, 3);
... ... @@ -48,7 +56,7 @@ public class RuleNodeJsScriptEngineTest {
48 56 @Test
49 57 public void msgCanBeUpdated() throws ScriptException {
50 58 String function = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
51   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
  59 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
52 60
53 61 TbMsgMetaData metaData = new TbMsgMetaData();
54 62 metaData.putValue("temp", "7");
... ... @@ -65,7 +73,7 @@ public class RuleNodeJsScriptEngineTest {
65 73 @Test
66 74 public void newAttributesCanBeAddedInMsg() throws ScriptException {
67 75 String function = "metadata.newAttr = metadata.humidity - msg.passed; return {metadata: metadata};";
68   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
  76 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
69 77 TbMsgMetaData metaData = new TbMsgMetaData();
70 78 metaData.putValue("temp", "7");
71 79 metaData.putValue("humidity", "99");
... ... @@ -81,7 +89,7 @@ public class RuleNodeJsScriptEngineTest {
81 89 @Test
82 90 public void payloadCanBeUpdated() throws ScriptException {
83 91 String function = "msg.passed = msg.passed * metadata.temp; msg.bigObj.newProp = 'Ukraine'; return {msg: msg};";
84   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
  92 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
85 93 TbMsgMetaData metaData = new TbMsgMetaData();
86 94 metaData.putValue("temp", "7");
87 95 metaData.putValue("humidity", "99");
... ... @@ -99,7 +107,7 @@ public class RuleNodeJsScriptEngineTest {
99 107 @Test
100 108 public void metadataAccessibleForFilter() throws ScriptException {
101 109 String function = "return metadata.humidity < 15;";
102   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
  110 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
103 111 TbMsgMetaData metaData = new TbMsgMetaData();
104 112 metaData.putValue("temp", "7");
105 113 metaData.putValue("humidity", "99");
... ... @@ -113,7 +121,7 @@ public class RuleNodeJsScriptEngineTest {
113 121 @Test
114 122 public void dataAccessibleForFilter() throws ScriptException {
115 123 String function = "return msg.passed < 15 && msg.name === 'Vit' && metadata.temp == 7 && msg.bigObj.prop == 42;";
116   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, function);
  124 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, function);
117 125 TbMsgMetaData metaData = new TbMsgMetaData();
118 126 metaData.putValue("temp", "7");
119 127 metaData.putValue("humidity", "99");
... ... @@ -134,7 +142,7 @@ public class RuleNodeJsScriptEngineTest {
134 142 "};\n" +
135 143 "\n" +
136 144 "return nextRelation(metadata, msg);";
137   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode);
  145 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
138 146 TbMsgMetaData metaData = new TbMsgMetaData();
139 147 metaData.putValue("temp", "10");
140 148 metaData.putValue("humidity", "99");
... ... @@ -156,7 +164,7 @@ public class RuleNodeJsScriptEngineTest {
156 164 "};\n" +
157 165 "\n" +
158 166 "return nextRelation(metadata, msg);";
159   - scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, jsCode);
  167 + scriptEngine = new RuleNodeJsScriptEngine(jsSandboxService, ruleNodeId, jsCode);
160 168 TbMsgMetaData metaData = new TbMsgMetaData();
161 169 metaData.putValue("temp", "10");
162 170 metaData.putValue("humidity", "99");
... ... @@ -168,4 +176,75 @@ public class RuleNodeJsScriptEngineTest {
168 176 scriptEngine.destroy();
169 177 }
170 178
  179 + @Test
  180 + public void concurrentReleasedCorrectly() throws InterruptedException, ExecutionException {
  181 + String code = "metadata.temp = metadata.temp * 10; return {metadata: metadata};";
  182 +
  183 + int repeat = 1000;
  184 + ExecutorService service = Executors.newFixedThreadPool(repeat);
  185 + Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
  186 + CountDownLatch startLatch = new CountDownLatch(repeat);
  187 + CountDownLatch finishLatch = new CountDownLatch(repeat);
  188 + AtomicInteger failedCount = new AtomicInteger(0);
  189 +
  190 + for (int i = 0; i < repeat; i++) {
  191 + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
  192 + }
  193 +
  194 + finishLatch.await();
  195 + assertTrue(scriptIds.size() == 1);
  196 + assertTrue(failedCount.get() == 0);
  197 +
  198 + CountDownLatch nextStart = new CountDownLatch(repeat);
  199 + CountDownLatch nextFinish = new CountDownLatch(repeat);
  200 + for (int i = 0; i < repeat; i++) {
  201 + service.submit(() -> runScript(nextStart, nextFinish, failedCount, scriptIds, code));
  202 + }
  203 +
  204 + nextFinish.await();
  205 + assertTrue(scriptIds.size() == 1);
  206 + assertTrue(failedCount.get() == 0);
  207 + service.shutdownNow();
  208 + }
  209 +
  210 + @Test
  211 + public void concurrentFailedEvaluationShouldThrowException() throws InterruptedException {
  212 + String code = "metadata.temp = metadata.temp * 10; urn {metadata: metadata};";
  213 +
  214 + int repeat = 10000;
  215 + ExecutorService service = Executors.newFixedThreadPool(repeat);
  216 + Map<UUID, Object> scriptIds = new ConcurrentHashMap<>();
  217 + CountDownLatch startLatch = new CountDownLatch(repeat);
  218 + CountDownLatch finishLatch = new CountDownLatch(repeat);
  219 + AtomicInteger failedCount = new AtomicInteger(0);
  220 + for (int i = 0; i < repeat; i++) {
  221 + service.submit(() -> {
  222 + service.submit(() -> runScript(startLatch, finishLatch, failedCount, scriptIds, code));
  223 + });
  224 + }
  225 +
  226 + finishLatch.await();
  227 + assertTrue(scriptIds.isEmpty());
  228 + assertEquals(repeat, failedCount.get());
  229 + service.shutdownNow();
  230 + }
  231 +
  232 + private void runScript(CountDownLatch startLatch, CountDownLatch finishLatch, AtomicInteger failedCount,
  233 + Map<UUID, Object> scriptIds, String code) {
  234 + try {
  235 + for (int k = 0; k < 10; k++) {
  236 + startLatch.countDown();
  237 + startLatch.await();
  238 + UUID scriptId = jsSandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, code).get();
  239 + scriptIds.put(scriptId, new Object());
  240 + jsSandboxService.invokeFunction(scriptId, ruleNodeId, "{}", "{}", "TEXT").get();
  241 + jsSandboxService.release(scriptId, ruleNodeId).get();
  242 + }
  243 + } catch (Throwable th) {
  244 + failedCount.incrementAndGet();
  245 + } finally {
  246 + finishLatch.countDown();
  247 + }
  248 + }
  249 +
171 250 }
\ No newline at end of file
... ...
... ... @@ -26,10 +26,8 @@ import org.thingsboard.rule.engine.api.*;
26 26 import org.thingsboard.rule.engine.api.util.DonAsynchron;
27 27 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
28 28 import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
29   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
30 29 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
31 30 import org.thingsboard.server.common.data.kv.TsKvEntry;
32   -import org.thingsboard.server.common.data.kv.TsKvQuery;
33 31 import org.thingsboard.server.common.data.plugin.ComponentType;
34 32 import org.thingsboard.server.common.msg.TbMsg;
35 33
... ... @@ -39,8 +37,7 @@ import java.util.concurrent.TimeUnit;
39 37 import java.util.stream.Collectors;
40 38
41 39 import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
42   -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.FETCH_MODE_ALL;
43   -import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.MAX_FETCH_SIZE;
  40 +import static org.thingsboard.rule.engine.metadata.TbGetTelemetryNodeConfiguration.*;
44 41 import static org.thingsboard.server.common.data.kv.Aggregation.NONE;
45 42
46 43 /**
... ... @@ -64,6 +61,7 @@ public class TbGetTelemetryNode implements TbNode {
64 61 private long endTsOffset;
65 62 private int limit;
66 63 private ObjectMapper mapper;
  64 + private String fetchMode;
67 65
68 66 @Override
69 67 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
... ... @@ -72,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode {
72 70 startTsOffset = TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval());
73 71 endTsOffset = TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval());
74 72 limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1;
  73 + fetchMode = config.getFetchMode();
75 74 mapper = new ObjectMapper();
76 75 mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
77 76 mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
... ... @@ -96,14 +95,18 @@ public class TbGetTelemetryNode implements TbNode {
96 95 }
97 96 }
98 97
99   - //TODO: handle direction;
100 98 private List<ReadTsKvQuery> buildQueries() {
101 99 long ts = System.currentTimeMillis();
102 100 long startTs = ts - startTsOffset;
103 101 long endTs = ts - endTsOffset;
104   -
  102 + String orderBy;
  103 + if (fetchMode.equals(FETCH_MODE_FIRST) || fetchMode.equals(FETCH_MODE_ALL)) {
  104 + orderBy = "ASC";
  105 + } else {
  106 + orderBy = "DESC";
  107 + }
105 108 return tsKeyNames.stream()
106   - .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE))
  109 + .map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, limit, NONE, orderBy))
107 110 .collect(Collectors.toList());
108 111 }
109 112
... ... @@ -116,7 +119,7 @@ public class TbGetTelemetryNode implements TbNode {
116 119 }
117 120
118 121 for (String key : tsKeyNames) {
119   - if(resultNode.has(key)){
  122 + if (resultNode.has(key)) {
120 123 msg.getMetaData().putValue(key, resultNode.get(key).toString());
121 124 }
122 125 }
... ... @@ -127,11 +130,11 @@ public class TbGetTelemetryNode implements TbNode {
127 130 }
128 131
129 132 private void processArray(ObjectNode node, TsKvEntry entry) {
130   - if(node.has(entry.getKey())){
  133 + if (node.has(entry.getKey())) {
131 134 ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey());
132 135 ObjectNode obj = buildNode(entry);
133 136 arrayNode.add(obj);
134   - }else {
  137 + } else {
135 138 ArrayNode arrayNode = mapper.createArrayNode();
136 139 ObjectNode obj = buildNode(entry);
137 140 arrayNode.add(obj);
... ...