Showing
16 changed files
with
273 additions
and
20 deletions
... | ... | @@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; |
33 | 33 | import org.springframework.beans.factory.annotation.Value; |
34 | 34 | import org.springframework.context.annotation.Lazy; |
35 | 35 | import org.springframework.data.redis.core.RedisTemplate; |
36 | +import org.springframework.scheduling.annotation.Scheduled; | |
36 | 37 | import org.springframework.stereotype.Component; |
37 | 38 | import org.thingsboard.rule.engine.api.MailService; |
38 | 39 | import org.thingsboard.rule.engine.api.RuleChainTransactionService; |
... | ... | @@ -89,6 +90,7 @@ import java.io.StringWriter; |
89 | 90 | import java.util.Optional; |
90 | 91 | import java.util.concurrent.ConcurrentHashMap; |
91 | 92 | import java.util.concurrent.ConcurrentMap; |
93 | +import java.util.concurrent.atomic.AtomicInteger; | |
92 | 94 | |
93 | 95 | @Slf4j |
94 | 96 | @Component |
... | ... | @@ -286,6 +288,21 @@ public class ActorSystemContext { |
286 | 288 | @Getter |
287 | 289 | private long statisticsPersistFrequency; |
288 | 290 | |
291 | + @Getter | |
292 | + private final AtomicInteger jsInvokeRequestsCount = new AtomicInteger(0); | |
293 | + @Getter | |
294 | + private final AtomicInteger jsInvokeResponsesCount = new AtomicInteger(0); | |
295 | + @Getter | |
296 | + private final AtomicInteger jsInvokeFailuresCount = new AtomicInteger(0); | |
297 | + | |
298 | + @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") | |
299 | + public void printStats() { | |
300 | + if (statisticsEnabled) { | |
301 | + log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]", | |
302 | + jsInvokeRequestsCount.getAndSet(0), jsInvokeResponsesCount.getAndSet(0), jsInvokeFailuresCount.getAndSet(0)); | |
303 | + } | |
304 | + } | |
305 | + | |
289 | 306 | @Value("${actors.tenant.create_components_on_init}") |
290 | 307 | @Getter |
291 | 308 | private boolean tenantComponentsInitEnabled; | ... | ... |
... | ... | @@ -230,6 +230,27 @@ class DefaultTbContext implements TbContext { |
230 | 230 | } |
231 | 231 | |
232 | 232 | @Override |
233 | + public void logJsEvalRequest() { | |
234 | + if (mainCtx.isStatisticsEnabled()) { | |
235 | + mainCtx.getJsInvokeRequestsCount().incrementAndGet(); | |
236 | + } | |
237 | + } | |
238 | + | |
239 | + @Override | |
240 | + public void logJsEvalResponse() { | |
241 | + if (mainCtx.isStatisticsEnabled()) { | |
242 | + mainCtx.getJsInvokeResponsesCount().incrementAndGet(); | |
243 | + } | |
244 | + } | |
245 | + | |
246 | + @Override | |
247 | + public void logJsEvalFailure() { | |
248 | + if (mainCtx.isStatisticsEnabled()) { | |
249 | + mainCtx.getJsInvokeFailuresCount().incrementAndGet(); | |
250 | + } | |
251 | + } | |
252 | + | |
253 | + @Override | |
233 | 254 | public String getNodeId() { |
234 | 255 | return mainCtx.getNodeIdProvider().getNodeId(); |
235 | 256 | } | ... | ... |
... | ... | @@ -131,15 +131,21 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService { |
131 | 131 | private void addNode(ServerInstance instance) { |
132 | 132 | for (int i = 0; i < virtualNodesSize; i++) { |
133 | 133 | circles[instance.getServerAddress().getServerType().ordinal()].put(hash(instance, i).asLong(), instance); |
134 | +// circles[instance.getServerAddress().getServerType().ordinal()].put(classic(instance, i), instance); | |
134 | 135 | } |
135 | 136 | } |
136 | 137 | |
137 | 138 | private void removeNode(ServerInstance instance) { |
138 | 139 | for (int i = 0; i < virtualNodesSize; i++) { |
139 | 140 | circles[instance.getServerAddress().getServerType().ordinal()].remove(hash(instance, i).asLong()); |
141 | +// circles[instance.getServerAddress().getServerType().ordinal()].remove(classic(instance, i)); | |
140 | 142 | } |
141 | 143 | } |
142 | 144 | |
145 | + private long classic(ServerInstance instance, int i) { | |
146 | + return (instance.getHost() + instance.getPort() + i).hashCode() * (Long.MAX_VALUE / Integer.MAX_VALUE); | |
147 | + } | |
148 | + | |
143 | 149 | private HashCode hash(ServerInstance instance, int i) { |
144 | 150 | return hashFunction.newHasher().putString(instance.getHost(), MiscUtils.UTF8).putInt(instance.getPort()).putInt(i).hash(); |
145 | 151 | } | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; |
22 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
23 | 23 | import org.springframework.beans.factory.annotation.Value; |
24 | 24 | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
25 | +import org.springframework.scheduling.annotation.Scheduled; | |
25 | 26 | import org.springframework.stereotype.Service; |
26 | 27 | import org.thingsboard.server.gen.js.JsInvokeProtos; |
27 | 28 | import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; |
... | ... | @@ -29,13 +30,14 @@ import org.thingsboard.server.kafka.TBKafkaProducerTemplate; |
29 | 30 | import org.thingsboard.server.kafka.TbKafkaRequestTemplate; |
30 | 31 | import org.thingsboard.server.kafka.TbKafkaSettings; |
31 | 32 | import org.thingsboard.server.kafka.TbNodeIdProvider; |
32 | -import org.thingsboard.server.service.cluster.discovery.DiscoveryService; | |
33 | 33 | |
34 | 34 | import javax.annotation.PostConstruct; |
35 | 35 | import javax.annotation.PreDestroy; |
36 | 36 | import java.util.Map; |
37 | 37 | import java.util.UUID; |
38 | 38 | import java.util.concurrent.ConcurrentHashMap; |
39 | +import java.util.concurrent.atomic.AtomicInteger; | |
40 | +import java.util.concurrent.atomic.AtomicLong; | |
39 | 41 | |
40 | 42 | @Slf4j |
41 | 43 | @ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true) |
... | ... | @@ -70,6 +72,25 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
70 | 72 | @Value("${js.remote.max_errors}") |
71 | 73 | private int maxErrors; |
72 | 74 | |
75 | + @Value("${js.remote.stats.enabled:false}") | |
76 | + private boolean statsEnabled; | |
77 | + | |
78 | + private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); | |
79 | + private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0); | |
80 | + private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0); | |
81 | + private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); | |
82 | + | |
83 | + @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") | |
84 | + public void printStats() { | |
85 | + if (statsEnabled) { | |
86 | + int invokeMsgs = kafkaInvokeMsgs.getAndSet(0); | |
87 | + int evalMsgs = kafkaEvalMsgs.getAndSet(0); | |
88 | + int failed = kafkaFailedMsgs.getAndSet(0); | |
89 | + log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}]", | |
90 | + kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed); | |
91 | + } | |
92 | + } | |
93 | + | |
73 | 94 | private TbKafkaRequestTemplate<JsInvokeProtos.RemoteJsRequest, JsInvokeProtos.RemoteJsResponse> kafkaTemplate; |
74 | 95 | private Map<UUID, String> scriptIdToBodysMap = new ConcurrentHashMap<>(); |
75 | 96 | |
... | ... | @@ -139,14 +160,17 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
139 | 160 | |
140 | 161 | log.trace("Post compile request for scriptId [{}]", scriptId); |
141 | 162 | ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); |
163 | + kafkaPushedMsgs.incrementAndGet(); | |
142 | 164 | return Futures.transform(future, response -> { |
143 | 165 | JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); |
144 | 166 | UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); |
145 | 167 | if (compilationResult.getSuccess()) { |
168 | + kafkaEvalMsgs.incrementAndGet(); | |
146 | 169 | scriptIdToNameMap.put(scriptId, functionName); |
147 | 170 | scriptIdToBodysMap.put(scriptId, scriptBody); |
148 | 171 | return compiledScriptId; |
149 | 172 | } else { |
173 | + kafkaFailedMsgs.incrementAndGet(); | |
150 | 174 | log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); |
151 | 175 | throw new RuntimeException(compilationResult.getErrorDetails()); |
152 | 176 | } |
... | ... | @@ -174,12 +198,16 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { |
174 | 198 | .setInvokeRequest(jsRequestBuilder.build()) |
175 | 199 | .build(); |
176 | 200 | |
201 | + | |
177 | 202 | ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); |
203 | + kafkaPushedMsgs.incrementAndGet(); | |
178 | 204 | return Futures.transform(future, response -> { |
179 | 205 | JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); |
180 | 206 | if (invokeResult.getSuccess()) { |
207 | + kafkaInvokeMsgs.incrementAndGet(); | |
181 | 208 | return invokeResult.getResult(); |
182 | 209 | } else { |
210 | + kafkaFailedMsgs.incrementAndGet(); | |
183 | 211 | log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); |
184 | 212 | throw new RuntimeException(invokeResult.getErrorDetails()); |
185 | 213 | } | ... | ... |
... | ... | @@ -246,6 +246,7 @@ actors: |
246 | 246 | statistics: |
247 | 247 | # Enable/disable actor statistics |
248 | 248 | enabled: "${ACTORS_STATISTICS_ENABLED:true}" |
249 | + js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}" | |
249 | 250 | persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" |
250 | 251 | queue: |
251 | 252 | # Enable/disable persistence of un-processed messages to the queue |
... | ... | @@ -467,6 +468,9 @@ js: |
467 | 468 | response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" |
468 | 469 | # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted |
469 | 470 | max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" |
471 | + stats: | |
472 | + enabled: "${TB_JS_REMOTE_STATS_ENABLED:false}" | |
473 | + print_interval_ms: "${TB_JS_REMOTE_STATS_PRINT_INTERVAL_MS:10000}" | |
470 | 474 | |
471 | 475 | transport: |
472 | 476 | type: "${TRANSPORT_TYPE:local}" # local or remote | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2019 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.service.cluster.routing; | |
17 | + | |
18 | +import com.datastax.driver.core.utils.UUIDs; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.junit.Before; | |
21 | +import org.junit.Test; | |
22 | +import org.junit.runner.RunWith; | |
23 | +import org.mockito.runners.MockitoJUnitRunner; | |
24 | +import org.springframework.beans.factory.annotation.Autowired; | |
25 | +import org.springframework.test.util.ReflectionTestUtils; | |
26 | +import org.thingsboard.server.common.data.Device; | |
27 | +import org.thingsboard.server.common.data.UUIDConverter; | |
28 | +import org.thingsboard.server.common.data.id.DeviceId; | |
29 | +import org.thingsboard.server.common.msg.cluster.ServerAddress; | |
30 | +import org.thingsboard.server.common.msg.cluster.ServerType; | |
31 | +import org.thingsboard.server.service.cluster.discovery.DiscoveryService; | |
32 | +import org.thingsboard.server.service.cluster.discovery.ServerInstance; | |
33 | + | |
34 | +import java.io.IOException; | |
35 | +import java.nio.file.Files; | |
36 | +import java.nio.file.Paths; | |
37 | +import java.util.ArrayList; | |
38 | +import java.util.Comparator; | |
39 | +import java.util.HashMap; | |
40 | +import java.util.List; | |
41 | +import java.util.Map; | |
42 | +import java.util.UUID; | |
43 | +import java.util.stream.Collectors; | |
44 | + | |
45 | +import static org.mockito.Mockito.mock; | |
46 | +import static org.mockito.Mockito.when; | |
47 | + | |
48 | +@Slf4j | |
49 | +@RunWith(MockitoJUnitRunner.class) | |
50 | +public class ConsistentClusterRoutingServiceTest { | |
51 | + | |
52 | + private ConsistentClusterRoutingService clusterRoutingService; | |
53 | + | |
54 | + private DiscoveryService discoveryService; | |
55 | + | |
56 | + private String hashFunctionName = "murmur3_128"; | |
57 | + private Integer virtualNodesSize = 1024*64; | |
58 | + private ServerAddress currentServer = new ServerAddress(" 100.96.1.0", 9001, ServerType.CORE); | |
59 | + | |
60 | + | |
61 | + @Before | |
62 | + public void setup() throws Exception { | |
63 | + discoveryService = mock(DiscoveryService.class); | |
64 | + clusterRoutingService = new ConsistentClusterRoutingService(); | |
65 | + ReflectionTestUtils.setField(clusterRoutingService, "discoveryService", discoveryService); | |
66 | + ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); | |
67 | + ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize); | |
68 | + when(discoveryService.getCurrentServer()).thenReturn(new ServerInstance(currentServer)); | |
69 | + List<ServerInstance> otherServers = new ArrayList<>(); | |
70 | + for (int i = 1; i < 30; i++) { | |
71 | + otherServers.add(new ServerInstance(new ServerAddress(" 100.96." + i*2 + "." + i, 9001, ServerType.CORE))); | |
72 | + } | |
73 | + when(discoveryService.getOtherServers()).thenReturn(otherServers); | |
74 | + clusterRoutingService.init(); | |
75 | + } | |
76 | + | |
77 | + @Test | |
78 | + public void testDispersionOnMillionDevices() { | |
79 | + List<DeviceId> devices = new ArrayList<>(); | |
80 | + for (int i = 0; i < 1000000; i++) { | |
81 | + devices.add(new DeviceId(UUIDs.timeBased())); | |
82 | + } | |
83 | + | |
84 | + testDevicesDispersion(devices); | |
85 | + } | |
86 | + | |
87 | + @Test | |
88 | + public void testDispersionOnDevicesFromFile() throws IOException { | |
89 | + List<String> deviceIdsStrList = Files.readAllLines(Paths.get("/home/ashvayka/Downloads/deviceIds.out")); | |
90 | + List<DeviceId> devices = deviceIdsStrList.stream().map(String::trim).filter(s -> !s.isEmpty()).map(UUIDConverter::fromString).map(DeviceId::new).collect(Collectors.toList()); | |
91 | + System.out.println("Devices: " + devices.size()); | |
92 | + testDevicesDispersion(devices); | |
93 | + testDevicesDispersion(devices); | |
94 | + testDevicesDispersion(devices); | |
95 | + testDevicesDispersion(devices); | |
96 | + testDevicesDispersion(devices); | |
97 | + | |
98 | + } | |
99 | + | |
100 | + private void testDevicesDispersion(List<DeviceId> devices) { | |
101 | + long start = System.currentTimeMillis(); | |
102 | + Map<ServerAddress, Integer> map = new HashMap<>(); | |
103 | + for (DeviceId deviceId : devices) { | |
104 | + ServerAddress address = clusterRoutingService.resolveById(deviceId).orElse(currentServer); | |
105 | + map.put(address, map.getOrDefault(address, 0) + 1); | |
106 | + } | |
107 | + | |
108 | + List<Map.Entry<ServerAddress, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); | |
109 | + long end = System.currentTimeMillis(); | |
110 | + System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + (data.get(data.size() - 1).getValue() - data.get(0).getValue())); | |
111 | + | |
112 | + for (Map.Entry<ServerAddress, Integer> entry : data) { | |
113 | +// System.out.println(entry.getKey().getHost() + ": " + entry.getValue()); | |
114 | + } | |
115 | + | |
116 | + } | |
117 | + | |
118 | +} | ... | ... |
... | ... | @@ -123,6 +123,12 @@ public interface TbContext { |
123 | 123 | |
124 | 124 | ScriptEngine createJsScriptEngine(String script, String... argNames); |
125 | 125 | |
126 | + void logJsEvalRequest(); | |
127 | + | |
128 | + void logJsEvalResponse(); | |
129 | + | |
130 | + void logJsEvalFailure(); | |
131 | + | |
126 | 132 | String getNodeId(); |
127 | 133 | |
128 | 134 | RuleChainTransactionService getRuleChainTransactionService(); | ... | ... |
... | ... | @@ -65,8 +65,10 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig |
65 | 65 | } |
66 | 66 | |
67 | 67 | private ListenableFuture<AlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { |
68 | + ctx.logJsEvalRequest(); | |
68 | 69 | ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails()); |
69 | 70 | return Futures.transformAsync(asyncDetails, details -> { |
71 | + ctx.logJsEvalResponse(); | |
70 | 72 | ListenableFuture<Boolean> clearFuture = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), details, System.currentTimeMillis()); |
71 | 73 | return Futures.transformAsync(clearFuture, cleared -> { |
72 | 74 | ListenableFuture<Alarm> savedAlarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), alarm.getId()); | ... | ... |
... | ... | @@ -42,10 +42,10 @@ import java.io.IOException; |
42 | 42 | nodeDescription = "Create or Update Alarm", |
43 | 43 | nodeDetails = |
44 | 44 | "Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" + |
45 | - "Node output:\n" + | |
46 | - "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm'. " + | |
47 | - "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>. " + | |
48 | - "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.", | |
45 | + "Node output:\n" + | |
46 | + "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm'. " + | |
47 | + "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>. " + | |
48 | + "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.", | |
49 | 49 | uiResources = {"static/rulenode/rulenode-core-config.js"}, |
50 | 50 | configDirective = "tbActionNodeCreateAlarmConfig", |
51 | 51 | icon = "notifications_active" |
... | ... | @@ -103,11 +103,15 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf |
103 | 103 | |
104 | 104 | private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) { |
105 | 105 | ListenableFuture<Alarm> asyncAlarm; |
106 | - if (msgAlarm != null ) { | |
106 | + if (msgAlarm != null) { | |
107 | 107 | asyncAlarm = Futures.immediateCheckedFuture(msgAlarm); |
108 | 108 | } else { |
109 | + ctx.logJsEvalRequest(); | |
109 | 110 | asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null), |
110 | - details -> buildAlarm(msg, details, ctx.getTenantId())); | |
111 | + details -> { | |
112 | + ctx.logJsEvalResponse(); | |
113 | + return buildAlarm(msg, details, ctx.getTenantId()); | |
114 | + }); | |
111 | 115 | } |
112 | 116 | ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm, |
113 | 117 | alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor()); |
... | ... | @@ -115,7 +119,9 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf |
115 | 119 | } |
116 | 120 | |
117 | 121 | private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) { |
122 | + ctx.logJsEvalRequest(); | |
118 | 123 | ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg, existingAlarm.getDetails()), (Function<JsonNode, Alarm>) details -> { |
124 | + ctx.logJsEvalResponse(); | |
119 | 125 | if (msgAlarm != null) { |
120 | 126 | existingAlarm.setSeverity(msgAlarm.getSeverity()); |
121 | 127 | existingAlarm.setPropagate(msgAlarm.isPropagate()); | ... | ... |
... | ... | @@ -53,12 +53,17 @@ public class TbLogNode implements TbNode { |
53 | 53 | @Override |
54 | 54 | public void onMsg(TbContext ctx, TbMsg msg) { |
55 | 55 | ListeningExecutor jsExecutor = ctx.getJsExecutor(); |
56 | + ctx.logJsEvalRequest(); | |
56 | 57 | withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)), |
57 | 58 | toString -> { |
59 | + ctx.logJsEvalResponse(); | |
58 | 60 | log.info(toString); |
59 | 61 | ctx.tellNext(msg, SUCCESS); |
60 | 62 | }, |
61 | - t -> ctx.tellFailure(msg, t)); | |
63 | + t -> { | |
64 | + ctx.logJsEvalResponse(); | |
65 | + ctx.tellFailure(msg, t); | |
66 | + }); | |
62 | 67 | } |
63 | 68 | |
64 | 69 | @Override | ... | ... |
... | ... | @@ -129,7 +129,9 @@ public class TbMsgGeneratorNode implements TbNode { |
129 | 129 | prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}"); |
130 | 130 | } |
131 | 131 | if (initialized) { |
132 | + ctx.logJsEvalRequest(); | |
132 | 133 | TbMsg generated = jsEngine.executeGenerate(prevMsg); |
134 | + ctx.logJsEvalResponse(); | |
133 | 135 | prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); |
134 | 136 | } |
135 | 137 | return prevMsg; | ... | ... |
... | ... | @@ -52,9 +52,16 @@ public class TbJsFilterNode implements TbNode { |
52 | 52 | @Override |
53 | 53 | public void onMsg(TbContext ctx, TbMsg msg) { |
54 | 54 | ListeningExecutor jsExecutor = ctx.getJsExecutor(); |
55 | + ctx.logJsEvalRequest(); | |
55 | 56 | withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)), |
56 | - filterResult -> ctx.tellNext(msg, filterResult ? "True" : "False"), | |
57 | - t -> ctx.tellFailure(msg, t)); | |
57 | + filterResult -> { | |
58 | + ctx.logJsEvalResponse(); | |
59 | + ctx.tellNext(msg, filterResult ? "True" : "False"); | |
60 | + }, | |
61 | + t -> { | |
62 | + ctx.tellFailure(msg, t); | |
63 | + ctx.logJsEvalFailure(); | |
64 | + }, ctx.getDbCallbackExecutor()); | |
58 | 65 | } |
59 | 66 | |
60 | 67 | @Override | ... | ... |
... | ... | @@ -54,9 +54,16 @@ public class TbJsSwitchNode implements TbNode { |
54 | 54 | @Override |
55 | 55 | public void onMsg(TbContext ctx, TbMsg msg) { |
56 | 56 | ListeningExecutor jsExecutor = ctx.getJsExecutor(); |
57 | + ctx.logJsEvalRequest(); | |
57 | 58 | withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)), |
58 | - result -> processSwitch(ctx, msg, result), | |
59 | - t -> ctx.tellFailure(msg, t)); | |
59 | + result -> { | |
60 | + ctx.logJsEvalResponse(); | |
61 | + processSwitch(ctx, msg, result); | |
62 | + }, | |
63 | + t -> { | |
64 | + ctx.logJsEvalFailure(); | |
65 | + ctx.tellFailure(msg, t); | |
66 | + }, ctx.getDbCallbackExecutor()); | |
60 | 67 | } |
61 | 68 | |
62 | 69 | private void processSwitch(TbContext ctx, TbMsg msg, Set<String> nextRelations) { | ... | ... |
... | ... | @@ -44,14 +44,21 @@ public abstract class TbAbstractTransformNode implements TbNode { |
44 | 44 | @Override |
45 | 45 | public void onMsg(TbContext ctx, TbMsg msg) { |
46 | 46 | withCallback(transform(ctx, msg), |
47 | - m -> { | |
48 | - if (m != null) { | |
49 | - ctx.tellNext(m, SUCCESS); | |
50 | - } else { | |
51 | - ctx.tellNext(msg, FAILURE); | |
52 | - } | |
53 | - }, | |
54 | - t -> ctx.tellFailure(msg, t)); | |
47 | + m -> transformSuccess(ctx, msg, m), | |
48 | + t -> transformFailure(ctx, msg, t), | |
49 | + ctx.getDbCallbackExecutor()); | |
50 | + } | |
51 | + | |
52 | + protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) { | |
53 | + ctx.tellFailure(msg, t); | |
54 | + } | |
55 | + | |
56 | + protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) { | |
57 | + if (m != null) { | |
58 | + ctx.tellNext(m, SUCCESS); | |
59 | + } else { | |
60 | + ctx.tellNext(msg, FAILURE); | |
61 | + } | |
55 | 62 | } |
56 | 63 | |
57 | 64 | protected abstract ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg); | ... | ... |
... | ... | @@ -21,6 +21,9 @@ import org.thingsboard.rule.engine.api.*; |
21 | 21 | import org.thingsboard.server.common.data.plugin.ComponentType; |
22 | 22 | import org.thingsboard.server.common.msg.TbMsg; |
23 | 23 | |
24 | +import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; | |
25 | +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; | |
26 | + | |
24 | 27 | @RuleNode( |
25 | 28 | type = ComponentType.TRANSFORMATION, |
26 | 29 | name = "script", |
... | ... | @@ -49,10 +52,23 @@ public class TbTransformMsgNode extends TbAbstractTransformNode { |
49 | 52 | |
50 | 53 | @Override |
51 | 54 | protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { |
55 | + ctx.logJsEvalRequest(); | |
52 | 56 | return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(msg)); |
53 | 57 | } |
54 | 58 | |
55 | 59 | @Override |
60 | + protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) { | |
61 | + ctx.logJsEvalResponse(); | |
62 | + super.transformSuccess(ctx, msg, m); | |
63 | + } | |
64 | + | |
65 | + @Override | |
66 | + protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) { | |
67 | + ctx.logJsEvalFailure(); | |
68 | + super.transformFailure(ctx, msg, t); | |
69 | + } | |
70 | + | |
71 | + @Override | |
56 | 72 | public void destroy() { |
57 | 73 | if (jsEngine != null) { |
58 | 74 | jsEngine.destroy(); | ... | ... |