Commit 0b4ec2b9b30c25ea930c9fa8b6f7a195e2573964

Authored by Andrew Shvayka
Committed by GitHub
2 parents ca355f04 647ccafc

Merge pull request #748 from thingsboard/develop/1.5-queue

Queue Put and Ack
Showing 37 changed files with 464 additions and 203 deletions
@@ -49,6 +49,7 @@ import org.thingsboard.server.dao.customer.CustomerService; @@ -49,6 +49,7 @@ import org.thingsboard.server.dao.customer.CustomerService;
49 import org.thingsboard.server.dao.device.DeviceService; 49 import org.thingsboard.server.dao.device.DeviceService;
50 import org.thingsboard.server.dao.event.EventService; 50 import org.thingsboard.server.dao.event.EventService;
51 import org.thingsboard.server.dao.plugin.PluginService; 51 import org.thingsboard.server.dao.plugin.PluginService;
  52 +import org.thingsboard.server.dao.queue.MsgQueue;
52 import org.thingsboard.server.dao.relation.RelationService; 53 import org.thingsboard.server.dao.relation.RelationService;
53 import org.thingsboard.server.dao.rule.RuleChainService; 54 import org.thingsboard.server.dao.rule.RuleChainService;
54 import org.thingsboard.server.dao.rule.RuleService; 55 import org.thingsboard.server.dao.rule.RuleService;
@@ -187,6 +188,10 @@ public class ActorSystemContext { @@ -187,6 +188,10 @@ public class ActorSystemContext {
187 @Getter 188 @Getter
188 private MailService mailService; 189 private MailService mailService;
189 190
  191 + @Autowired
  192 + @Getter
  193 + private MsgQueue msgQueue;
  194 +
190 @Value("${actors.session.sync.timeout}") 195 @Value("${actors.session.sync.timeout}")
191 @Getter 196 @Getter
192 private long syncSessionTimeout; 197 private long syncSessionTimeout;
@@ -204,9 +204,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -204,9 +204,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
204 void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) { 204 void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
205 PendingSessionMsgData data = pendingMsgs.remove(msg.getId()); 205 PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
206 if (data != null && data.isReplyOnQueueAck()) { 206 if (data != null && data.isReplyOnQueueAck()) {
207 - logger.debug("[{}] Queue put [{}] ack detected!", deviceId, msg.getId());  
208 - ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());  
209 - sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); 207 + int remainingAcks = data.getAckMsgCount() - 1;
  208 + data.setAckMsgCount(remainingAcks);
  209 + logger.debug("[{}] Queue put [{}] ack detected. Remaining acks: {}!", deviceId, msg.getId(), remainingAcks);
  210 + if (remainingAcks == 0) {
  211 + ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onSuccess(data.getSessionMsgType(), data.getRequestId());
  212 + sendMsgToSessionActor(new BasicToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
  213 + }
210 } 214 }
211 } 215 }
212 216
@@ -320,8 +324,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -320,8 +324,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
320 kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); 324 kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
321 } 325 }
322 326
323 - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));  
324 - pushToRuleEngineWithTimeout(context, tbMsg, src, request); 327 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
  328 + PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
  329 + SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
  330 + pushToRuleEngineWithTimeout(context, tbMsg, msgData);
325 } 331 }
326 332
327 private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) { 333 private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -329,10 +335,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -329,10 +335,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
329 335
330 Map<Long, List<KvEntry>> tsData = request.getData(); 336 Map<Long, List<KvEntry>> tsData = request.getData();
331 337
332 - JsonArray json = new JsonArray(); 338 + PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
  339 + SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
  340 +
333 for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) { 341 for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
334 - JsonObject ts = new JsonObject();  
335 - ts.addProperty("ts", entry.getKey()); 342 + JsonObject json = new JsonObject();
  343 + json.addProperty("ts", entry.getKey());
336 JsonObject values = new JsonObject(); 344 JsonObject values = new JsonObject();
337 for (KvEntry kv : entry.getValue()) { 345 for (KvEntry kv : entry.getValue()) {
338 kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v)); 346 kv.getBooleanValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
@@ -340,12 +348,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -340,12 +348,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
340 kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v)); 348 kv.getDoubleValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
341 kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v)); 349 kv.getStrValue().ifPresent(v -> values.addProperty(kv.getKey(), v));
342 } 350 }
343 - ts.add("values", values);  
344 - json.add(ts); 351 + json.add("values", values);
  352 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
  353 + pushToRuleEngineWithTimeout(context, tbMsg, msgData);
345 } 354 }
346 -  
347 - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, defaultMetaData, TbMsgDataType.JSON, gson.toJson(json));  
348 - pushToRuleEngineWithTimeout(context, tbMsg, src, request);  
349 } 355 }
350 356
351 private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) { 357 private void handleClientSideRPCRequest(ActorContext context, DeviceToDeviceActorMsg src) {
@@ -357,8 +363,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -357,8 +363,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
357 363
358 TbMsgMetaData requestMetaData = defaultMetaData.copy(); 364 TbMsgMetaData requestMetaData = defaultMetaData.copy();
359 requestMetaData.putValue("requestId", Integer.toString(request.getRequestId())); 365 requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
360 - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json));  
361 - pushToRuleEngineWithTimeout(context, tbMsg, src, request); 366 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
  367 + PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), SessionMsgType.TO_SERVER_RPC_REQUEST, request.getRequestId(), false, 1);
  368 + pushToRuleEngineWithTimeout(context, tbMsg, msgData);
362 369
363 scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout()); 370 scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
364 toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress())); 371 toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(src.getSessionId(), src.getSessionType(), src.getServerAddress()));
@@ -380,19 +387,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -380,19 +387,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
380 } 387 }
381 } 388 }
382 389
383 - private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg) {  
384 - pushToRuleEngineWithTimeout(context, tbMsg, src, fromDeviceRequestMsg, true);  
385 - }  
386 -  
387 - private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, DeviceToDeviceActorMsg src, FromDeviceRequestMsg fromDeviceRequestMsg, boolean replyOnAck) {  
388 - SessionMsgType sessionMsgType = fromDeviceRequestMsg.getMsgType();  
389 - int requestId = fromDeviceRequestMsg.getRequestId(); 390 + private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
  391 + SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
  392 + int requestId = pendingMsgData.getRequestId();
390 if (systemContext.isQueuePersistenceEnabled()) { 393 if (systemContext.isQueuePersistenceEnabled()) {
391 - pendingMsgs.put(tbMsg.getId(), new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), sessionMsgType, requestId, replyOnAck)); 394 + pendingMsgs.put(tbMsg.getId(), pendingMsgData);
392 scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout()); 395 scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
393 } else { 396 } else {
394 - ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), src.getSessionId());  
395 - sendMsgToSessionActor(response, src.getServerAddress()); 397 + ToDeviceSessionActorMsg response = new BasicToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
  398 + sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
396 } 399 }
397 context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self()); 400 context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
398 } 401 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.actors.device; 16 package org.thingsboard.server.actors.device;
17 17
  18 +import lombok.AllArgsConstructor;
18 import lombok.Data; 19 import lombok.Data;
19 import org.thingsboard.server.common.data.id.SessionId; 20 import org.thingsboard.server.common.data.id.SessionId;
20 import org.thingsboard.server.common.msg.cluster.ServerAddress; 21 import org.thingsboard.server.common.msg.cluster.ServerAddress;
@@ -26,6 +27,7 @@ import java.util.Optional; @@ -26,6 +27,7 @@ import java.util.Optional;
26 * Created by ashvayka on 17.04.18. 27 * Created by ashvayka on 17.04.18.
27 */ 28 */
28 @Data 29 @Data
  30 +@AllArgsConstructor
29 public final class PendingSessionMsgData { 31 public final class PendingSessionMsgData {
30 32
31 private final SessionId sessionId; 33 private final SessionId sessionId;
@@ -33,5 +35,6 @@ public final class PendingSessionMsgData { @@ -33,5 +35,6 @@ public final class PendingSessionMsgData {
33 private final SessionMsgType sessionMsgType; 35 private final SessionMsgType sessionMsgType;
34 private final int requestId; 36 private final int requestId;
35 private final boolean replyOnQueueAck; 37 private final boolean replyOnQueueAck;
  38 + private int ackMsgCount;
36 39
37 } 40 }
@@ -53,7 +53,6 @@ import java.util.function.Consumer; @@ -53,7 +53,6 @@ import java.util.function.Consumer;
53 */ 53 */
54 class DefaultTbContext implements TbContext { 54 class DefaultTbContext implements TbContext {
55 55
56 - private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;  
57 private final ActorSystemContext mainCtx; 56 private final ActorSystemContext mainCtx;
58 private final RuleNodeCtx nodeCtx; 57 private final RuleNodeCtx nodeCtx;
59 58
@@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext { @@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext {
120 119
121 @Override 120 @Override
122 public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { 121 public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
123 - return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data); 122 + return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
124 } 123 }
125 124
126 @Override 125 @Override
@@ -239,7 +238,6 @@ class DefaultTbContext implements TbContext { @@ -239,7 +238,6 @@ class DefaultTbContext implements TbContext {
239 .build()); 238 .build());
240 }); 239 });
241 } 240 }
242 -  
243 }; 241 };
244 } 242 }
245 } 243 }
@@ -19,6 +19,7 @@ import akka.actor.ActorContext; @@ -19,6 +19,7 @@ import akka.actor.ActorContext;
19 import akka.actor.ActorRef; 19 import akka.actor.ActorRef;
20 import akka.actor.Props; 20 import akka.actor.Props;
21 import akka.event.LoggingAdapter; 21 import akka.event.LoggingAdapter;
  22 +import com.datastax.driver.core.utils.UUIDs;
22 import org.thingsboard.server.actors.ActorSystemContext; 23 import org.thingsboard.server.actors.ActorSystemContext;
23 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg; 24 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
24 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg; 25 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -41,6 +42,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; @@ -41,6 +42,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
41 import org.thingsboard.server.dao.rule.RuleChainService; 42 import org.thingsboard.server.dao.rule.RuleChainService;
42 43
43 import java.util.ArrayList; 44 import java.util.ArrayList;
  45 +import java.util.Collections;
44 import java.util.HashMap; 46 import java.util.HashMap;
45 import java.util.List; 47 import java.util.List;
46 import java.util.Map; 48 import java.util.Map;
@@ -52,6 +54,7 @@ import java.util.stream.Collectors; @@ -52,6 +54,7 @@ import java.util.stream.Collectors;
52 */ 54 */
53 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> { 55 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
54 56
  57 + private static final long DEFAULT_CLUSTER_PARTITION = 0L;
55 private final ActorRef parent; 58 private final ActorRef parent;
56 private final ActorRef self; 59 private final ActorRef self;
57 private final Map<RuleNodeId, RuleNodeCtx> nodeActors; 60 private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
@@ -83,6 +86,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -83,6 +86,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
83 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); 86 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
84 } 87 }
85 initRoutes(ruleChain, ruleNodeList); 88 initRoutes(ruleChain, ruleNodeList);
  89 + //TODO: read all messages from queues of the actors and push then to the corresponding node actors;
86 started = true; 90 started = true;
87 } else { 91 } else {
88 onUpdate(context); 92 onUpdate(context);
@@ -142,15 +146,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -142,15 +146,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
142 // Populating the routes map; 146 // Populating the routes map;
143 for (RuleNode ruleNode : ruleNodeList) { 147 for (RuleNode ruleNode : ruleNodeList) {
144 List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId()); 148 List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
145 - for (EntityRelation relation : relations) {  
146 - if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {  
147 - RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));  
148 - if (ruleNodeCtx == null) {  
149 - throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]"); 149 + if (relations.size() == 0) {
  150 + nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
  151 + } else {
  152 + for (EntityRelation relation : relations) {
  153 + if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
  154 + RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
  155 + if (ruleNodeCtx == null) {
  156 + throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
  157 + }
150 } 158 }
  159 + nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
  160 + .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
151 } 161 }
152 - nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())  
153 - .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));  
154 } 162 }
155 } 163 }
156 164
@@ -161,45 +169,62 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -161,45 +169,62 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
161 169
162 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { 170 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
163 checkActive(); 171 checkActive();
164 - TbMsg tbMsg = envelope.getTbMsg();  
165 - //TODO: push to queue and act on ack in async way  
166 - pushMsgToNode(firstNode, tbMsg); 172 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
167 } 173 }
168 174
169 - public void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) { 175 + void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
170 checkActive(); 176 checkActive();
171 - TbMsg tbMsg = envelope.getTbMsg();  
172 - //TODO: push to queue and act on ack in async way  
173 - pushMsgToNode(firstNode, tbMsg);  
174 - envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(tbMsg.getId()), self); 177 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
  178 + pushMsgToNode(firstNode, msg);
  179 + envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
  180 + });
175 } 181 }
176 182
177 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) { 183 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
178 checkActive(); 184 checkActive();
179 RuleNodeId originator = envelope.getOriginator(); 185 RuleNodeId originator = envelope.getOriginator();
180 String targetRelationType = envelope.getRelationType(); 186 String targetRelationType = envelope.getRelationType();
181 - List<RuleNodeRelation> relations = nodeRoutes.get(originator);  
182 - if (relations == null) {  
183 - return;  
184 - }  
185 - boolean copy = relations.size() > 1;  
186 - for (RuleNodeRelation relation : relations) {  
187 - TbMsg msg = envelope.getMsg();  
188 - if (copy) {  
189 - msg = msg.copy(); 187 + List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
  188 + .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
  189 + .collect(Collectors.toList());
  190 +
  191 + TbMsg msg = envelope.getMsg();
  192 + int relationsCount = relations.size();
  193 + if (relationsCount == 0) {
  194 + queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition());
  195 + } else if (relationsCount == 1) {
  196 + for (RuleNodeRelation relation : relations) {
  197 + pushToTarget(msg, relation.getOut());
190 } 198 }
191 - if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {  
192 - switch (relation.getOut().getEntityType()) { 199 + } else {
  200 + for (RuleNodeRelation relation : relations) {
  201 + EntityId target = relation.getOut();
  202 + switch (target.getEntityType()) {
193 case RULE_NODE: 203 case RULE_NODE:
194 - RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());  
195 - RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);  
196 - pushMsgToNode(targetRuleNode, msg); 204 + RuleNodeId targetId = new RuleNodeId(target.getId());
  205 + RuleNodeCtx targetNodeCtx = nodeActors.get(targetId);
  206 + TbMsg copy = msg.copy(UUIDs.timeBased(), entityId, targetId, DEFAULT_CLUSTER_PARTITION);
  207 + putToQueue(copy, queuedMsg -> pushMsgToNode(targetNodeCtx, queuedMsg));
197 break; 208 break;
198 case RULE_CHAIN: 209 case RULE_CHAIN:
199 -// TODO: implement 210 + parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, true), self);
200 break; 211 break;
201 } 212 }
202 } 213 }
  214 + //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
  215 + EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
  216 + queue.ack(msg, ackId.getId(), msg.getClusterPartition());
  217 + }
  218 + }
  219 +
  220 + private void pushToTarget(TbMsg msg, EntityId target) {
  221 + switch (target.getEntityType()) {
  222 + case RULE_NODE:
  223 + pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg);
  224 + break;
  225 + case RULE_CHAIN:
  226 + parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, false), self);
  227 + break;
203 } 228 }
204 } 229 }
205 230
@@ -208,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -208,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
208 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self); 233 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
209 } 234 }
210 } 235 }
  236 +
  237 + private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
  238 + // We don't put firstNodeId because it may change over time;
  239 + return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
  240 + }
211 } 241 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.TbContext;
  20 +import org.thingsboard.server.common.data.id.RuleChainId;
  21 +import org.thingsboard.server.common.msg.MsgType;
  22 +import org.thingsboard.server.common.msg.TbActorMsg;
  23 +import org.thingsboard.server.common.msg.TbMsg;
  24 +
  25 +/**
  26 + * Created by ashvayka on 19.03.18.
  27 + */
  28 +@Data
  29 +final class RuleChainToRuleChainMsg implements TbActorMsg {
  30 +
  31 + private final RuleChainId target;
  32 + private final RuleChainId source;
  33 + private final TbMsg msg;
  34 + private final boolean enqueue;
  35 +
  36 + @Override
  37 + public MsgType getMsgType() {
  38 + return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
  39 + }
  40 +}
@@ -17,22 +17,32 @@ package org.thingsboard.server.actors.shared; @@ -17,22 +17,32 @@ package org.thingsboard.server.actors.shared;
17 17
18 import akka.actor.ActorContext; 18 import akka.actor.ActorContext;
19 import akka.event.LoggingAdapter; 19 import akka.event.LoggingAdapter;
  20 +import com.google.common.util.concurrent.FutureCallback;
  21 +import com.google.common.util.concurrent.Futures;
20 import org.thingsboard.server.actors.ActorSystemContext; 22 import org.thingsboard.server.actors.ActorSystemContext;
21 import org.thingsboard.server.actors.stats.StatsPersistTick; 23 import org.thingsboard.server.actors.stats.StatsPersistTick;
  24 +import org.thingsboard.server.common.data.id.EntityId;
22 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
23 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; 26 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
  27 +import org.thingsboard.server.common.msg.TbMsg;
24 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 28 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  29 +import org.thingsboard.server.dao.queue.MsgQueue;
25 30
26 -public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgProcessor { 31 +import javax.annotation.Nullable;
  32 +import java.util.function.Consumer;
  33 +
  34 +public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor {
27 35
28 protected final TenantId tenantId; 36 protected final TenantId tenantId;
29 protected final T entityId; 37 protected final T entityId;
  38 + protected final MsgQueue queue;
30 protected ComponentLifecycleState state; 39 protected ComponentLifecycleState state;
31 40
32 protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) { 41 protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) {
33 super(systemContext, logger); 42 super(systemContext, logger);
34 this.tenantId = tenantId; 43 this.tenantId = tenantId;
35 this.entityId = id; 44 this.entityId = id;
  45 + this.queue = systemContext.getMsgQueue();
36 } 46 }
37 47
38 public abstract void start(ActorContext context) throws Exception; 48 public abstract void start(ActorContext context) throws Exception;
@@ -75,4 +85,19 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr @@ -75,4 +85,19 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
75 throw new IllegalStateException("Rule chain is not active!"); 85 throw new IllegalStateException("Rule chain is not active!");
76 } 86 }
77 } 87 }
  88 +
  89 + protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
  90 + EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
  91 + Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
  92 + @Override
  93 + public void onSuccess(@Nullable Void result) {
  94 + onSuccess.accept(tbMsg);
  95 + }
  96 +
  97 + @Override
  98 + public void onFailure(Throwable t) {
  99 + logger.debug("Failed to push message [{}] to queue due to [{}]", tbMsg, t);
  100 + }
  101 + });
  102 + }
78 } 103 }
@@ -237,7 +237,7 @@ public class RuleChainController extends BaseController { @@ -237,7 +237,7 @@ public class RuleChainController extends BaseController {
237 ScriptEngine engine = null; 237 ScriptEngine engine = null;
238 try { 238 try {
239 engine = new NashornJsEngine(script, functionName, argNames); 239 engine = new NashornJsEngine(script, functionName, argNames);
240 - TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data); 240 + TbMsg inMsg = new TbMsg(UUIDs.timeBased(), msgType, null, new TbMsgMetaData(metadata), data, null, null, 0L);
241 switch (scriptType) { 241 switch (scriptType) {
242 case "update": 242 case "update":
243 output = msgToOutput(engine.executeUpdate(inMsg)); 243 output = msgToOutput(engine.executeUpdate(inMsg));
@@ -118,7 +118,7 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn @@ -118,7 +118,7 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
118 String newData = data != null ? data : msg.getData(); 118 String newData = data != null ? data : msg.getData();
119 TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData(); 119 TbMsgMetaData newMetadata = metadata != null ? new TbMsgMetaData(metadata) : msg.getMetaData();
120 String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType(); 120 String newMessageType = !StringUtils.isEmpty(messageType) ? messageType : msg.getType();
121 - return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData); 121 + return new TbMsg(msg.getId(), newMessageType, msg.getOriginator(), newMetadata, newData, msg.getRuleChainId(), msg.getRuleNodeId(), msg.getClusterPartition());
122 } catch (Throwable th) { 122 } catch (Throwable th) {
123 th.printStackTrace(); 123 th.printStackTrace();
124 throw new RuntimeException("Failed to unbind message data from javascript result", th); 124 throw new RuntimeException("Failed to unbind message data from javascript result", th);
@@ -36,7 +36,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @@ -36,7 +36,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
36 36
37 public abstract class BaseComponentDescriptorControllerTest extends AbstractControllerTest { 37 public abstract class BaseComponentDescriptorControllerTest extends AbstractControllerTest {
38 38
39 - private static final int AMOUNT_OF_DEFAULT_FILTER_NODES = 3; 39 + private static final int AMOUNT_OF_DEFAULT_FILTER_NODES = 4;
40 private Tenant savedTenant; 40 private Tenant savedTenant;
41 private User tenantAdmin; 41 private User tenantAdmin;
42 42
@@ -87,7 +87,7 @@ public abstract class BaseComponentDescriptorControllerTest extends AbstractCont @@ -87,7 +87,7 @@ public abstract class BaseComponentDescriptorControllerTest extends AbstractCont
87 }); 87 });
88 88
89 Assert.assertNotNull(descriptors); 89 Assert.assertNotNull(descriptors);
90 - Assert.assertEquals(AMOUNT_OF_DEFAULT_FILTER_NODES, descriptors.size()); 90 + Assert.assertTrue(descriptors.size() >= AMOUNT_OF_DEFAULT_FILTER_NODES);
91 91
92 for (ComponentType type : ComponentType.values()) { 92 for (ComponentType type : ComponentType.values()) {
93 doGet("/api/components/" + type).andExpect(status().isOk()); 93 doGet("/api/components/" + type).andExpect(status().isOk());
@@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
80 } 80 }
81 } 81 }
82 82
83 - @Ignore  
84 @Test 83 @Test
85 public void testServerMqttOneWayRpc() throws Exception { 84 public void testServerMqttOneWayRpc() throws Exception {
86 Device device = new Device(); 85 Device device = new Device();
@@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
107 Assert.assertTrue(StringUtils.isEmpty(result)); 106 Assert.assertTrue(StringUtils.isEmpty(result));
108 } 107 }
109 108
110 - @Ignore  
111 @Test 109 @Test
112 public void testServerMqttOneWayRpcDeviceOffline() throws Exception { 110 public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
113 Device device = new Device(); 111 Device device = new Device();
@@ -15,7 +15,6 @@ @@ -15,7 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.mqtt.rpc.sql; 16 package org.thingsboard.server.mqtt.rpc.sql;
17 17
18 -import org.thingsboard.server.dao.service.DaoNoSqlTest;  
19 import org.thingsboard.server.dao.service.DaoSqlTest; 18 import org.thingsboard.server.dao.service.DaoSqlTest;
20 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest; 19 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
21 20
@@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
150 "CUSTOM", 150 "CUSTOM",
151 device.getId(), 151 device.getId(),
152 new TbMsgMetaData(), 152 new TbMsgMetaData(),
153 - "{}"); 153 + "{}", null, null, 0L);
154 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 154 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
155 155
156 Thread.sleep(3000); 156 Thread.sleep(3000);
@@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
138 "CUSTOM", 138 "CUSTOM",
139 device.getId(), 139 device.getId(),
140 new TbMsgMetaData(), 140 new TbMsgMetaData(),
141 - "{}"); 141 + "{}",
  142 + null, null, 0L);
142 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 143 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
143 144
144 Thread.sleep(3000); 145 Thread.sleep(3000);
@@ -42,7 +42,7 @@ public class NashornJsEngineTest { @@ -42,7 +42,7 @@ public class NashornJsEngineTest {
42 metaData.putValue("humidity", "99"); 42 metaData.putValue("humidity", "99");
43 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 43 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
44 44
45 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 45 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
46 46
47 TbMsg actual = scriptEngine.executeUpdate(msg); 47 TbMsg actual = scriptEngine.executeUpdate(msg);
48 assertEquals("70", actual.getMetaData().getValue("temp")); 48 assertEquals("70", actual.getMetaData().getValue("temp"));
@@ -57,7 +57,7 @@ public class NashornJsEngineTest { @@ -57,7 +57,7 @@ public class NashornJsEngineTest {
57 metaData.putValue("humidity", "99"); 57 metaData.putValue("humidity", "99");
58 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 58 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
59 59
60 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 60 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
61 61
62 TbMsg actual = scriptEngine.executeUpdate(msg); 62 TbMsg actual = scriptEngine.executeUpdate(msg);
63 assertEquals("94", actual.getMetaData().getValue("newAttr")); 63 assertEquals("94", actual.getMetaData().getValue("newAttr"));
@@ -72,7 +72,7 @@ public class NashornJsEngineTest { @@ -72,7 +72,7 @@ public class NashornJsEngineTest {
72 metaData.putValue("humidity", "99"); 72 metaData.putValue("humidity", "99");
73 String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; 73 String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
74 74
75 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 75 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
76 76
77 TbMsg actual = scriptEngine.executeUpdate(msg); 77 TbMsg actual = scriptEngine.executeUpdate(msg);
78 78
@@ -89,7 +89,7 @@ public class NashornJsEngineTest { @@ -89,7 +89,7 @@ public class NashornJsEngineTest {
89 metaData.putValue("humidity", "99"); 89 metaData.putValue("humidity", "99");
90 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 90 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
91 91
92 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 92 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
93 assertFalse(scriptEngine.executeFilter(msg)); 93 assertFalse(scriptEngine.executeFilter(msg));
94 } 94 }
95 95
@@ -102,7 +102,7 @@ public class NashornJsEngineTest { @@ -102,7 +102,7 @@ public class NashornJsEngineTest {
102 metaData.putValue("humidity", "99"); 102 metaData.putValue("humidity", "99");
103 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 103 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
104 104
105 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 105 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
106 assertTrue(scriptEngine.executeFilter(msg)); 106 assertTrue(scriptEngine.executeFilter(msg));
107 } 107 }
108 108
@@ -122,7 +122,7 @@ public class NashornJsEngineTest { @@ -122,7 +122,7 @@ public class NashornJsEngineTest {
122 metaData.putValue("humidity", "99"); 122 metaData.putValue("humidity", "99");
123 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 123 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
124 124
125 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 125 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
126 Set<String> actual = scriptEngine.executeSwitch(msg); 126 Set<String> actual = scriptEngine.executeSwitch(msg);
127 assertEquals(Sets.newHashSet("one"), actual); 127 assertEquals(Sets.newHashSet("one"), actual);
128 } 128 }
@@ -143,7 +143,7 @@ public class NashornJsEngineTest { @@ -143,7 +143,7 @@ public class NashornJsEngineTest {
143 metaData.putValue("humidity", "99"); 143 metaData.putValue("humidity", "99");
144 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 144 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
145 145
146 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 146 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
147 Set<String> actual = scriptEngine.executeSwitch(msg); 147 Set<String> actual = scriptEngine.executeSwitch(msg);
148 assertEquals(Sets.newHashSet("one", "three"), actual); 148 assertEquals(Sets.newHashSet("one", "three"), actual);
149 } 149 }
@@ -48,6 +48,11 @@ public enum MsgType { @@ -48,6 +48,11 @@ public enum MsgType {
48 RULE_CHAIN_TO_RULE_MSG, 48 RULE_CHAIN_TO_RULE_MSG,
49 49
50 /** 50 /**
  51 + * Message that is sent by RuleChainActor to other RuleChainActor with command to process TbMsg.
  52 + */
  53 + RULE_CHAIN_TO_RULE_CHAIN_MSG,
  54 +
  55 + /**
51 * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain. 56 * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
52 */ 57 */
53 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG, 58 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
@@ -15,12 +15,13 @@ @@ -15,12 +15,13 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg; 16 package org.thingsboard.server.common.msg;
17 17
18 -import com.google.protobuf.ByteString;  
19 import com.google.protobuf.InvalidProtocolBufferException; 18 import com.google.protobuf.InvalidProtocolBufferException;
20 import lombok.AllArgsConstructor; 19 import lombok.AllArgsConstructor;
21 import lombok.Data; 20 import lombok.Data;
22 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
23 import org.thingsboard.server.common.data.id.EntityIdFactory; 22 import org.thingsboard.server.common.data.id.EntityIdFactory;
  23 +import org.thingsboard.server.common.data.id.RuleChainId;
  24 +import org.thingsboard.server.common.data.id.RuleNodeId;
24 import org.thingsboard.server.common.msg.gen.MsgProtos; 25 import org.thingsboard.server.common.msg.gen.MsgProtos;
25 26
26 import java.io.Serializable; 27 import java.io.Serializable;
@@ -41,22 +42,40 @@ public final class TbMsg implements Serializable { @@ -41,22 +42,40 @@ public final class TbMsg implements Serializable {
41 private final TbMsgDataType dataType; 42 private final TbMsgDataType dataType;
42 private final String data; 43 private final String data;
43 44
44 - public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data) { 45 + //The following fields are not persisted to DB, because they can always be recovered from the context;
  46 + private final RuleChainId ruleChainId;
  47 + private final RuleNodeId ruleNodeId;
  48 + private final long clusterPartition;
  49 +
  50 + public TbMsg(UUID id, String type, EntityId originator, TbMsgMetaData metaData, String data,
  51 + RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
45 this.id = id; 52 this.id = id;
46 this.type = type; 53 this.type = type;
47 this.originator = originator; 54 this.originator = originator;
48 this.metaData = metaData; 55 this.metaData = metaData;
49 - this.dataType = TbMsgDataType.JSON;  
50 this.data = data; 56 this.data = data;
  57 + this.dataType = TbMsgDataType.JSON;
  58 + this.ruleChainId = ruleChainId;
  59 + this.ruleNodeId = ruleNodeId;
  60 + this.clusterPartition = clusterPartition;
51 } 61 }
52 62
53 public static ByteBuffer toBytes(TbMsg msg) { 63 public static ByteBuffer toBytes(TbMsg msg) {
54 MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder(); 64 MsgProtos.TbMsgProto.Builder builder = MsgProtos.TbMsgProto.newBuilder();
55 builder.setId(msg.getId().toString()); 65 builder.setId(msg.getId().toString());
56 builder.setType(msg.getType()); 66 builder.setType(msg.getType());
57 - if (msg.getOriginator() != null) {  
58 - builder.setEntityType(msg.getOriginator().getEntityType().name());  
59 - builder.setEntityId(msg.getOriginator().getId().toString()); 67 + builder.setEntityType(msg.getOriginator().getEntityType().name());
  68 + builder.setEntityIdMSB(msg.getOriginator().getId().getMostSignificantBits());
  69 + builder.setEntityIdLSB(msg.getOriginator().getId().getLeastSignificantBits());
  70 +
  71 + if (msg.getRuleChainId() != null) {
  72 + builder.setRuleChainIdMSB(msg.getRuleChainId().getId().getMostSignificantBits());
  73 + builder.setRuleChainIdLSB(msg.getRuleChainId().getId().getLeastSignificantBits());
  74 + }
  75 +
  76 + if (msg.getRuleNodeId() != null) {
  77 + builder.setRuleNodeIdMSB(msg.getRuleNodeId().getId().getMostSignificantBits());
  78 + builder.setRuleNodeIdLSB(msg.getRuleNodeId().getId().getLeastSignificantBits());
60 } 79 }
61 80
62 if (msg.getMetaData() != null) { 81 if (msg.getMetaData() != null) {
@@ -73,15 +92,18 @@ public final class TbMsg implements Serializable { @@ -73,15 +92,18 @@ public final class TbMsg implements Serializable {
73 try { 92 try {
74 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array()); 93 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
75 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); 94 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
76 - EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()); 95 + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
  96 + RuleChainId ruleChainId = new RuleChainId(new UUID(proto.getRuleChainIdMSB(), proto.getRuleChainIdLSB()));
  97 + RuleNodeId ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleChainIdLSB()));
77 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; 98 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
78 - return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData()); 99 + return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getClusterPartition());
79 } catch (InvalidProtocolBufferException e) { 100 } catch (InvalidProtocolBufferException e) {
80 throw new IllegalStateException("Could not parse protobuf for TbMsg", e); 101 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
81 } 102 }
82 } 103 }
83 104
84 - public TbMsg copy() {  
85 - return new TbMsg(id, type, originator, metaData.copy(), dataType, data); 105 + public TbMsg copy(UUID newId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, long clusterPartition) {
  106 + return new TbMsg(newId, type, originator, metaData, dataType, data, ruleChainId, ruleNodeId, clusterPartition);
86 } 107 }
  108 +
87 } 109 }
@@ -27,10 +27,19 @@ message TbMsgProto { @@ -27,10 +27,19 @@ message TbMsgProto {
27 string id = 1; 27 string id = 1;
28 string type = 2; 28 string type = 2;
29 string entityType = 3; 29 string entityType = 3;
30 - string entityId = 4; 30 + int64 entityIdMSB = 4;
  31 + int64 entityIdLSB = 5;
31 32
32 - TbMsgMetaDataProto metaData = 5; 33 + int64 ruleChainIdMSB = 6;
  34 + int64 ruleChainIdLSB = 7;
  35 +
  36 + int64 ruleNodeIdMSB = 8;
  37 + int64 ruleNodeIdLSB = 9;
  38 + int64 clusterPartition = 10;
  39 +
  40 + TbMsgMetaDataProto metaData = 11;
  41 +
  42 + int32 dataType = 12;
  43 + string data = 13;
33 44
34 - int32 dataType = 6;  
35 - string data = 7;  
36 } 45 }
dao/src/main/java/org/thingsboard/server/dao/queue/MsgQueue.java renamed from dao/src/main/java/org/thingsboard/server/dao/queue/MsqQueue.java
@@ -20,7 +20,7 @@ import org.thingsboard.server.common.msg.TbMsg; @@ -20,7 +20,7 @@ import org.thingsboard.server.common.msg.TbMsg;
20 20
21 import java.util.UUID; 21 import java.util.UUID;
22 22
23 -public interface MsqQueue { 23 +public interface MsgQueue {
24 24
25 ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition); 25 ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition);
26 26
@@ -27,10 +27,9 @@ import lombok.extern.slf4j.Slf4j; @@ -27,10 +27,9 @@ import lombok.extern.slf4j.Slf4j;
27 import org.springframework.beans.factory.annotation.Autowired; 27 import org.springframework.beans.factory.annotation.Autowired;
28 import org.springframework.boot.CommandLineRunner; 28 import org.springframework.boot.CommandLineRunner;
29 import org.springframework.boot.SpringApplication; 29 import org.springframework.boot.SpringApplication;
30 -import org.springframework.boot.autoconfigure.EnableAutoConfiguration;  
31 -import org.springframework.boot.autoconfigure.SpringBootApplication;  
32 import org.springframework.context.annotation.Bean; 30 import org.springframework.context.annotation.Bean;
33 -import org.springframework.context.annotation.ComponentScan; 31 +import org.thingsboard.server.common.data.id.RuleChainId;
  32 +import org.thingsboard.server.common.data.id.RuleNodeId;
34 import org.thingsboard.server.common.msg.TbMsg; 33 import org.thingsboard.server.common.msg.TbMsg;
35 import org.thingsboard.server.common.msg.TbMsgDataType; 34 import org.thingsboard.server.common.msg.TbMsgDataType;
36 import org.thingsboard.server.common.msg.TbMsgMetaData; 35 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -61,11 +60,11 @@ public class QueueBenchmark implements CommandLineRunner { @@ -61,11 +60,11 @@ public class QueueBenchmark implements CommandLineRunner {
61 } 60 }
62 61
63 @Autowired 62 @Autowired
64 - private MsqQueue msqQueue; 63 + private MsgQueue msgQueue;
65 64
66 @Override 65 @Override
67 public void run(String... strings) throws Exception { 66 public void run(String... strings) throws Exception {
68 - System.out.println("It works + " + msqQueue); 67 + System.out.println("It works + " + msgQueue);
69 68
70 69
71 long start = System.currentTimeMillis(); 70 long start = System.currentTimeMillis();
@@ -81,8 +80,8 @@ public class QueueBenchmark implements CommandLineRunner { @@ -81,8 +80,8 @@ public class QueueBenchmark implements CommandLineRunner {
81 try { 80 try {
82 TbMsg msg = randomMsg(); 81 TbMsg msg = randomMsg();
83 UUID nodeId = UUIDs.timeBased(); 82 UUID nodeId = UUIDs.timeBased();
84 - ListenableFuture<Void> put = msqQueue.put(msg, nodeId, 100L);  
85 -// ListenableFuture<Void> put = msqQueue.ack(msg, nodeId, 100L); 83 + ListenableFuture<Void> put = msgQueue.put(msg, nodeId, 100L);
  84 +// ListenableFuture<Void> put = msgQueue.ack(msg, nodeId, 100L);
86 Futures.addCallback(put, new FutureCallback<Void>() { 85 Futures.addCallback(put, new FutureCallback<Void>() {
87 @Override 86 @Override
88 public void onSuccess(@Nullable Void result) { 87 public void onSuccess(@Nullable Void result) {
@@ -126,7 +125,7 @@ public class QueueBenchmark implements CommandLineRunner { @@ -126,7 +125,7 @@ public class QueueBenchmark implements CommandLineRunner {
126 TbMsgMetaData metaData = new TbMsgMetaData(); 125 TbMsgMetaData metaData = new TbMsgMetaData();
127 metaData.putValue("key", "value"); 126 metaData.putValue("key", "value");
128 String dataStr = "someContent"; 127 String dataStr = "someContent";
129 - return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr); 128 + return new TbMsg(UUIDs.timeBased(), "type", null, metaData, TbMsgDataType.JSON, dataStr, new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
130 } 129 }
131 130
132 @Bean 131 @Bean
dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/CassandraMsgQueue.java renamed from dao/src/main/java/org/thingsboard/server/dao/service/queue/cassandra/CassandraMsqQueue.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j; @@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
22 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
23 import org.springframework.stereotype.Component; 23 import org.springframework.stereotype.Component;
24 import org.thingsboard.server.common.msg.TbMsg; 24 import org.thingsboard.server.common.msg.TbMsg;
25 -import org.thingsboard.server.dao.queue.MsqQueue; 25 +import org.thingsboard.server.dao.queue.MsgQueue;
26 import org.thingsboard.server.dao.service.queue.cassandra.repository.AckRepository; 26 import org.thingsboard.server.dao.service.queue.cassandra.repository.AckRepository;
27 import org.thingsboard.server.dao.service.queue.cassandra.repository.MsgRepository; 27 import org.thingsboard.server.dao.service.queue.cassandra.repository.MsgRepository;
28 import org.thingsboard.server.dao.util.NoSqlDao; 28 import org.thingsboard.server.dao.util.NoSqlDao;
@@ -33,7 +33,7 @@ import java.util.UUID; @@ -33,7 +33,7 @@ import java.util.UUID;
33 @Component 33 @Component
34 @Slf4j 34 @Slf4j
35 @NoSqlDao 35 @NoSqlDao
36 -public class CassandraMsqQueue implements MsqQueue { 36 +public class CassandraMsgQueue implements MsgQueue {
37 37
38 @Autowired 38 @Autowired
39 private MsgRepository msgRepository; 39 private MsgRepository msgRepository;
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql.queue;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.dao.queue.MsgQueue;
  24 +import org.thingsboard.server.dao.util.SqlDao;
  25 +
  26 +import java.util.Collections;
  27 +import java.util.UUID;
  28 +
  29 +/**
  30 + * Created by ashvayka on 27.04.18.
  31 + */
  32 +@Component
  33 +@Slf4j
  34 +@SqlDao
  35 +public class DummySqlMsgQueue implements MsgQueue {
  36 + @Override
  37 + public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
  38 + return Futures.immediateFuture(null);
  39 + }
  40 +
  41 + @Override
  42 + public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
  43 + return Futures.immediateFuture(null);
  44 + }
  45 +
  46 + @Override
  47 + public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
  48 + return Collections.emptyList();
  49 + }
  50 +}
@@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest { @@ -33,8 +33,8 @@ public class UnprocessedMsgFilterTest {
33 public void acknowledgedMsgsAreFilteredOut() { 33 public void acknowledgedMsgsAreFilteredOut() {
34 UUID id1 = UUID.randomUUID(); 34 UUID id1 = UUID.randomUUID();
35 UUID id2 = UUID.randomUUID(); 35 UUID id2 = UUID.randomUUID();
36 - TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null);  
37 - TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null); 36 + TbMsg msg1 = new TbMsg(id1, "T", null, null, null, null, null, null, 0L);
  37 + TbMsg msg2 = new TbMsg(id2, "T", null, null, null, null, null, null, 0L);
38 List<TbMsg> msgs = Lists.newArrayList(msg1, msg2); 38 List<TbMsg> msgs = Lists.newArrayList(msg1, msg2);
39 List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L)); 39 List<MsgAck> acks = Lists.newArrayList(new MsgAck(id2, UUID.randomUUID(), 1L, 1L));
40 Collection<TbMsg> actual = msgFilter.filter(msgs, acks); 40 Collection<TbMsg> actual = msgFilter.filter(msgs, acks);
@@ -23,6 +23,8 @@ import org.junit.Before; @@ -23,6 +23,8 @@ import org.junit.Before;
23 import org.junit.Test; 23 import org.junit.Test;
24 import org.springframework.beans.factory.annotation.Autowired; 24 import org.springframework.beans.factory.annotation.Autowired;
25 import org.thingsboard.server.common.data.id.DeviceId; 25 import org.thingsboard.server.common.data.id.DeviceId;
  26 +import org.thingsboard.server.common.data.id.RuleChainId;
  27 +import org.thingsboard.server.common.data.id.RuleNodeId;
26 import org.thingsboard.server.common.msg.TbMsg; 28 import org.thingsboard.server.common.msg.TbMsg;
27 import org.thingsboard.server.common.msg.TbMsgDataType; 29 import org.thingsboard.server.common.msg.TbMsgDataType;
28 import org.thingsboard.server.common.msg.TbMsgMetaData; 30 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -45,7 +47,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest { @@ -45,7 +47,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
45 47
46 @Test 48 @Test
47 public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException { 49 public void msgCanBeSavedAndRead() throws ExecutionException, InterruptedException {
48 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000"); 50 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
  51 + new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
49 UUID nodeId = UUIDs.timeBased(); 52 UUID nodeId = UUIDs.timeBased();
50 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L); 53 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
51 future.get(); 54 future.get();
@@ -55,7 +58,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest { @@ -55,7 +58,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
55 58
56 @Test 59 @Test
57 public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException { 60 public void expiredMsgsAreNotReturned() throws ExecutionException, InterruptedException {
58 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000"); 61 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), null, TbMsgDataType.JSON, "0000",
  62 + new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
59 UUID nodeId = UUIDs.timeBased(); 63 UUID nodeId = UUIDs.timeBased();
60 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L); 64 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 2L, 2L, 2L);
61 future.get(); 65 future.get();
@@ -68,7 +72,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest { @@ -68,7 +72,8 @@ public class CassandraMsgRepositoryTest extends AbstractServiceTest {
68 TbMsgMetaData metaData = new TbMsgMetaData(); 72 TbMsgMetaData metaData = new TbMsgMetaData();
69 metaData.putValue("key", "value"); 73 metaData.putValue("key", "value");
70 String dataStr = "someContent"; 74 String dataStr = "someContent";
71 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr); 75 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "type", new DeviceId(UUIDs.timeBased()), metaData, TbMsgDataType.JSON, dataStr,
  76 + new RuleChainId(UUIDs.timeBased()), new RuleNodeId(UUIDs.timeBased()), 0L);
72 UUID nodeId = UUIDs.timeBased(); 77 UUID nodeId = UUIDs.timeBased();
73 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L); 78 ListenableFuture<Void> future = msgRepository.save(msg, nodeId, 1L, 1L, 1L);
74 future.get(); 79 future.get();
@@ -91,11 +91,11 @@ public class TbAlarmNode implements TbNode { @@ -91,11 +91,11 @@ public class TbAlarmNode implements TbNode {
91 if (alarmResult.alarm == null) { 91 if (alarmResult.alarm == null) {
92 ctx.tellNext(msg, "False"); 92 ctx.tellNext(msg, "False");
93 } else if (alarmResult.isCreated) { 93 } else if (alarmResult.isCreated) {
94 - ctx.tellNext(toAlarmMsg(alarmResult, msg), "Created"); 94 + ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created");
95 } else if (alarmResult.isUpdated) { 95 } else if (alarmResult.isUpdated) {
96 - ctx.tellNext(toAlarmMsg(alarmResult, msg), "Updated"); 96 + ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated");
97 } else if (alarmResult.isCleared) { 97 } else if (alarmResult.isCleared) {
98 - ctx.tellNext(toAlarmMsg(alarmResult, msg), "Cleared"); 98 + ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared");
99 } 99 }
100 }, 100 },
101 t -> ctx.tellError(msg, t)); 101 t -> ctx.tellError(msg, t));
@@ -176,7 +176,7 @@ public class TbAlarmNode implements TbNode { @@ -176,7 +176,7 @@ public class TbAlarmNode implements TbNode {
176 return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg)); 176 return ctx.getJsExecutor().executeAsync(() -> buildDetailsJsEngine.executeJson(msg));
177 } 177 }
178 178
179 - private TbMsg toAlarmMsg(AlarmResult alarmResult, TbMsg originalMsg) { 179 + private TbMsg toAlarmMsg(TbContext ctx, AlarmResult alarmResult, TbMsg originalMsg) {
180 JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm); 180 JsonNode jsonNodes = mapper.valueToTree(alarmResult.alarm);
181 String data = jsonNodes.toString(); 181 String data = jsonNodes.toString();
182 TbMsgMetaData metaData = originalMsg.getMetaData().copy(); 182 TbMsgMetaData metaData = originalMsg.getMetaData().copy();
@@ -187,7 +187,7 @@ public class TbAlarmNode implements TbNode { @@ -187,7 +187,7 @@ public class TbAlarmNode implements TbNode {
187 } else if (alarmResult.isCleared) { 187 } else if (alarmResult.isCleared) {
188 metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString()); 188 metaData.putValue(IS_CLEARED_ALARM, Boolean.TRUE.toString());
189 } 189 }
190 - return new TbMsg(UUIDs.timeBased(), "ALARM", originalMsg.getOriginator(), metaData, data); 190 + return ctx.newMsg("ALARM", originalMsg.getOriginator(), metaData, data);
191 } 191 }
192 192
193 193
@@ -78,18 +78,18 @@ public class TbMsgGeneratorNode implements TbNode { @@ -78,18 +78,18 @@ public class TbMsgGeneratorNode implements TbNode {
78 } 78 }
79 79
80 private void sentTickMsg(TbContext ctx) { 80 private void sentTickMsg(TbContext ctx) {
81 - TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); 81 + TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
82 nextTickId = tickMsg.getId(); 82 nextTickId = tickMsg.getId();
83 ctx.tellSelf(tickMsg, delay); 83 ctx.tellSelf(tickMsg, delay);
84 } 84 }
85 85
86 - protected ListenableFuture<TbMsg> generate(TbContext ctx) { 86 + private ListenableFuture<TbMsg> generate(TbContext ctx) {
87 return ctx.getJsExecutor().executeAsync(() -> { 87 return ctx.getJsExecutor().executeAsync(() -> {
88 if (prevMsg == null) { 88 if (prevMsg == null) {
89 - prevMsg = new TbMsg(UUIDs.timeBased(), "", originatorId, new TbMsgMetaData(), "{}"); 89 + prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
90 } 90 }
91 TbMsg generated = jsEngine.executeGenerate(prevMsg); 91 TbMsg generated = jsEngine.executeGenerate(prevMsg);
92 - prevMsg = new TbMsg(UUIDs.timeBased(), generated.getType(), originatorId, generated.getMetaData(), generated.getData()); 92 + prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
93 return prevMsg; 93 return prevMsg;
94 }); 94 });
95 } 95 }
@@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode { @@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode {
61 ctx.tellNext(msg, nextRelations); 61 ctx.tellNext(msg, nextRelations);
62 } 62 }
63 63
64 - @Override 64 + @Override
65 public void destroy() { 65 public void destroy() {
66 if (jsEngine != null) { 66 if (jsEngine != null) {
67 jsEngine.destroy(); 67 jsEngine.destroy();
@@ -76,7 +76,7 @@ public class TbMsgToEmailNode implements TbNode { @@ -76,7 +76,7 @@ public class TbMsgToEmailNode implements TbNode {
76 public void onMsg(TbContext ctx, TbMsg msg) { 76 public void onMsg(TbContext ctx, TbMsg msg) {
77 try { 77 try {
78 EmailPojo email = convert(msg); 78 EmailPojo email = convert(msg);
79 - TbMsg emailMsg = buildEmailMsg(msg, email); 79 + TbMsg emailMsg = buildEmailMsg(ctx, msg, email);
80 ctx.tellNext(emailMsg); 80 ctx.tellNext(emailMsg);
81 } catch (Exception ex) { 81 } catch (Exception ex) {
82 log.warn("Can not convert message to email " + ex.getMessage()); 82 log.warn("Can not convert message to email " + ex.getMessage());
@@ -84,9 +84,9 @@ public class TbMsgToEmailNode implements TbNode { @@ -84,9 +84,9 @@ public class TbMsgToEmailNode implements TbNode {
84 } 84 }
85 } 85 }
86 86
87 - private TbMsg buildEmailMsg(TbMsg msg, EmailPojo email) throws JsonProcessingException { 87 + private TbMsg buildEmailMsg(TbContext ctx, TbMsg msg, EmailPojo email) throws JsonProcessingException {
88 String emailJson = MAPPER.writeValueAsString(email); 88 String emailJson = MAPPER.writeValueAsString(email);
89 - return new TbMsg(UUIDs.timeBased(), SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson); 89 + return ctx.newMsg(SEND_EMAIL_TYPE, msg.getOriginator(), msg.getMetaData().copy(), emailJson);
90 } 90 }
91 91
92 private EmailPojo convert(TbMsg msg) throws IOException { 92 private EmailPojo convert(TbMsg msg) throws IOException {
@@ -63,8 +63,6 @@ public class TbSendEmailNode implements TbNode { @@ -63,8 +63,6 @@ public class TbSendEmailNode implements TbNode {
63 } catch (Exception ex) { 63 } catch (Exception ex) {
64 ctx.tellError(msg, ex); 64 ctx.tellError(msg, ex);
65 } 65 }
66 -  
67 -  
68 } 66 }
69 67
70 private EmailPojo getEmail(TbMsg msg) throws IOException { 68 private EmailPojo getEmail(TbMsg msg) throws IOException {
@@ -84,6 +82,5 @@ public class TbSendEmailNode implements TbNode { @@ -84,6 +82,5 @@ public class TbSendEmailNode implements TbNode {
84 82
85 @Override 83 @Override
86 public void destroy() { 84 public void destroy() {
87 -  
88 } 85 }
89 } 86 }
@@ -60,7 +60,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode { @@ -60,7 +60,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
60 @Override 60 @Override
61 protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) { 61 protected ListenableFuture<TbMsg> transform(TbContext ctx, TbMsg msg) {
62 ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator()); 62 ListenableFuture<? extends EntityId> newOriginator = getNewOriginator(ctx, msg.getOriginator());
63 - return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> new TbMsg(msg.getId(), msg.getType(), n, msg.getMetaData(), msg.getData())); 63 + return Futures.transform(newOriginator, (Function<EntityId, TbMsg>) n -> ctx.newMsg(msg.getType(), n, msg.getMetaData(), msg.getData()));
64 } 64 }
65 65
66 private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) { 66 private ListenableFuture<? extends EntityId> getNewOriginator(TbContext ctx, EntityId original) {
@@ -35,6 +35,8 @@ import org.thingsboard.rule.engine.api.*; @@ -35,6 +35,8 @@ import org.thingsboard.rule.engine.api.*;
35 import org.thingsboard.server.common.data.alarm.Alarm; 35 import org.thingsboard.server.common.data.alarm.Alarm;
36 import org.thingsboard.server.common.data.id.DeviceId; 36 import org.thingsboard.server.common.data.id.DeviceId;
37 import org.thingsboard.server.common.data.id.EntityId; 37 import org.thingsboard.server.common.data.id.EntityId;
  38 +import org.thingsboard.server.common.data.id.RuleChainId;
  39 +import org.thingsboard.server.common.data.id.RuleNodeId;
38 import org.thingsboard.server.common.data.id.TenantId; 40 import org.thingsboard.server.common.data.id.TenantId;
39 import org.thingsboard.server.common.msg.TbMsg; 41 import org.thingsboard.server.common.msg.TbMsg;
40 import org.thingsboard.server.common.msg.TbMsgMetaData; 42 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -73,6 +75,9 @@ public class TbAlarmNodeTest { @@ -73,6 +75,9 @@ public class TbAlarmNodeTest {
73 @Mock 75 @Mock
74 private ScriptEngine detailsJs; 76 private ScriptEngine detailsJs;
75 77
  78 + private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  79 + private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  80 +
76 private ListeningExecutor dbExecutor; 81 private ListeningExecutor dbExecutor;
77 82
78 private EntityId originator = new DeviceId(UUIDs.timeBased()); 83 private EntityId originator = new DeviceId(UUIDs.timeBased());
@@ -103,7 +108,7 @@ public class TbAlarmNodeTest { @@ -103,7 +108,7 @@ public class TbAlarmNodeTest {
103 public void newAlarmCanBeCreated() throws ScriptException, IOException { 108 public void newAlarmCanBeCreated() throws ScriptException, IOException {
104 initWithScript(); 109 initWithScript();
105 metaData.putValue("key", "value"); 110 metaData.putValue("key", "value");
106 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 111 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
107 112
108 when(createJs.executeFilter(msg)).thenReturn(true); 113 when(createJs.executeFilter(msg)).thenReturn(true);
109 when(detailsJs.executeJson(msg)).thenReturn(null); 114 when(detailsJs.executeJson(msg)).thenReturn(null);
@@ -113,17 +118,21 @@ public class TbAlarmNodeTest { @@ -113,17 +118,21 @@ public class TbAlarmNodeTest {
113 118
114 node.onMsg(ctx, msg); 119 node.onMsg(ctx, msg);
115 120
116 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
117 - verify(ctx).tellNext(captor.capture(), eq("Created"));  
118 - TbMsg actualMsg = captor.getValue(); 121 + verify(ctx).tellNext(any(), eq("Created"));
  122 +
  123 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  124 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  125 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  126 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  127 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
119 128
120 - assertEquals("ALARM", actualMsg.getType());  
121 - assertEquals(originator, actualMsg.getOriginator());  
122 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
123 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));  
124 - assertNotSame(metaData, actualMsg.getMetaData()); 129 + assertEquals("ALARM", typeCaptor.getValue());
  130 + assertEquals(originator, originatorCaptor.getValue());
  131 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  132 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
  133 + assertNotSame(metaData, metadataCaptor.getValue());
125 134
126 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 135 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
127 Alarm expectedAlarm = Alarm.builder() 136 Alarm expectedAlarm = Alarm.builder()
128 .tenantId(tenantId) 137 .tenantId(tenantId)
129 .originator(originator) 138 .originator(originator)
@@ -143,7 +152,7 @@ public class TbAlarmNodeTest { @@ -143,7 +152,7 @@ public class TbAlarmNodeTest {
143 public void shouldCreateScriptThrowsException() throws ScriptException { 152 public void shouldCreateScriptThrowsException() throws ScriptException {
144 initWithScript(); 153 initWithScript();
145 metaData.putValue("key", "value"); 154 metaData.putValue("key", "value");
146 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 155 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
147 156
148 when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message")); 157 when(createJs.executeFilter(msg)).thenThrow(new NotImplementedException("message"));
149 158
@@ -165,7 +174,7 @@ public class TbAlarmNodeTest { @@ -165,7 +174,7 @@ public class TbAlarmNodeTest {
165 public void buildDetailsThrowsException() throws ScriptException, IOException { 174 public void buildDetailsThrowsException() throws ScriptException, IOException {
166 initWithScript(); 175 initWithScript();
167 metaData.putValue("key", "value"); 176 metaData.putValue("key", "value");
168 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 177 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
169 178
170 when(createJs.executeFilter(msg)).thenReturn(true); 179 when(createJs.executeFilter(msg)).thenReturn(true);
171 when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message")); 180 when(detailsJs.executeJson(msg)).thenThrow(new NotImplementedException("message"));
@@ -191,7 +200,7 @@ public class TbAlarmNodeTest { @@ -191,7 +200,7 @@ public class TbAlarmNodeTest {
191 public void ifAlarmClearedCreateNew() throws ScriptException, IOException { 200 public void ifAlarmClearedCreateNew() throws ScriptException, IOException {
192 initWithScript(); 201 initWithScript();
193 metaData.putValue("key", "value"); 202 metaData.putValue("key", "value");
194 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 203 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
195 204
196 Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build(); 205 Alarm clearedAlarm = Alarm.builder().status(CLEARED_ACK).build();
197 206
@@ -203,17 +212,22 @@ public class TbAlarmNodeTest { @@ -203,17 +212,22 @@ public class TbAlarmNodeTest {
203 212
204 node.onMsg(ctx, msg); 213 node.onMsg(ctx, msg);
205 214
206 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
207 - verify(ctx).tellNext(captor.capture(), eq("Created"));  
208 - TbMsg actualMsg = captor.getValue(); 215 + verify(ctx).tellNext(any(), eq("Created"));
  216 +
  217 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  218 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  219 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  220 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  221 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
209 222
210 - assertEquals("ALARM", actualMsg.getType());  
211 - assertEquals(originator, actualMsg.getOriginator());  
212 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
213 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));  
214 - assertNotSame(metaData, actualMsg.getMetaData()); 223 + assertEquals("ALARM", typeCaptor.getValue());
  224 + assertEquals(originator, originatorCaptor.getValue());
  225 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  226 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
  227 + assertNotSame(metaData, metadataCaptor.getValue());
215 228
216 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 229 +
  230 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
217 Alarm expectedAlarm = Alarm.builder() 231 Alarm expectedAlarm = Alarm.builder()
218 .tenantId(tenantId) 232 .tenantId(tenantId)
219 .originator(originator) 233 .originator(originator)
@@ -233,7 +247,7 @@ public class TbAlarmNodeTest { @@ -233,7 +247,7 @@ public class TbAlarmNodeTest {
233 public void alarmCanBeUpdated() throws ScriptException, IOException { 247 public void alarmCanBeUpdated() throws ScriptException, IOException {
234 initWithScript(); 248 initWithScript();
235 metaData.putValue("key", "value"); 249 metaData.putValue("key", "value");
236 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 250 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
237 251
238 long oldEndDate = System.currentTimeMillis(); 252 long oldEndDate = System.currentTimeMillis();
239 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); 253 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
@@ -247,17 +261,21 @@ public class TbAlarmNodeTest { @@ -247,17 +261,21 @@ public class TbAlarmNodeTest {
247 261
248 node.onMsg(ctx, msg); 262 node.onMsg(ctx, msg);
249 263
250 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
251 - verify(ctx).tellNext(captor.capture(), eq("Updated"));  
252 - TbMsg actualMsg = captor.getValue(); 264 + verify(ctx).tellNext(any(), eq("Updated"));
  265 +
  266 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  267 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  268 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  269 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  270 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
253 271
254 - assertEquals("ALARM", actualMsg.getType());  
255 - assertEquals(originator, actualMsg.getOriginator());  
256 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
257 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));  
258 - assertNotSame(metaData, actualMsg.getMetaData()); 272 + assertEquals("ALARM", typeCaptor.getValue());
  273 + assertEquals(originator, originatorCaptor.getValue());
  274 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  275 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM));
  276 + assertNotSame(metaData, metadataCaptor.getValue());
259 277
260 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 278 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
261 assertTrue(activeAlarm.getEndTs() > oldEndDate); 279 assertTrue(activeAlarm.getEndTs() > oldEndDate);
262 Alarm expectedAlarm = Alarm.builder() 280 Alarm expectedAlarm = Alarm.builder()
263 .tenantId(tenantId) 281 .tenantId(tenantId)
@@ -279,7 +297,7 @@ public class TbAlarmNodeTest { @@ -279,7 +297,7 @@ public class TbAlarmNodeTest {
279 public void alarmCanBeCleared() throws ScriptException, IOException { 297 public void alarmCanBeCleared() throws ScriptException, IOException {
280 initWithScript(); 298 initWithScript();
281 metaData.putValue("key", "value"); 299 metaData.putValue("key", "value");
282 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 300 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
283 301
284 long oldEndDate = System.currentTimeMillis(); 302 long oldEndDate = System.currentTimeMillis();
285 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build(); 303 Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
@@ -293,17 +311,21 @@ public class TbAlarmNodeTest { @@ -293,17 +311,21 @@ public class TbAlarmNodeTest {
293 311
294 node.onMsg(ctx, msg); 312 node.onMsg(ctx, msg);
295 313
296 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
297 - verify(ctx).tellNext(captor.capture(), eq("Cleared"));  
298 - TbMsg actualMsg = captor.getValue(); 314 + verify(ctx).tellNext(any(), eq("Cleared"));
  315 +
  316 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  317 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  318 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  319 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  320 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
299 321
300 - assertEquals("ALARM", actualMsg.getType());  
301 - assertEquals(originator, actualMsg.getOriginator());  
302 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
303 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));  
304 - assertNotSame(metaData, actualMsg.getMetaData()); 322 + assertEquals("ALARM", typeCaptor.getValue());
  323 + assertEquals(originator, originatorCaptor.getValue());
  324 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  325 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
  326 + assertNotSame(metaData, metadataCaptor.getValue());
305 327
306 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 328 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
307 Alarm expectedAlarm = Alarm.builder() 329 Alarm expectedAlarm = Alarm.builder()
308 .tenantId(tenantId) 330 .tenantId(tenantId)
309 .originator(originator) 331 .originator(originator)
@@ -27,6 +27,8 @@ import org.mockito.Mock; @@ -27,6 +27,8 @@ import org.mockito.Mock;
27 import org.mockito.runners.MockitoJUnitRunner; 27 import org.mockito.runners.MockitoJUnitRunner;
28 import org.mockito.stubbing.Answer; 28 import org.mockito.stubbing.Answer;
29 import org.thingsboard.rule.engine.api.*; 29 import org.thingsboard.rule.engine.api.*;
  30 +import org.thingsboard.server.common.data.id.RuleChainId;
  31 +import org.thingsboard.server.common.data.id.RuleNodeId;
30 import org.thingsboard.server.common.msg.TbMsg; 32 import org.thingsboard.server.common.msg.TbMsg;
31 import org.thingsboard.server.common.msg.TbMsgMetaData; 33 import org.thingsboard.server.common.msg.TbMsgMetaData;
32 34
@@ -48,10 +50,13 @@ public class TbJsFilterNodeTest { @@ -48,10 +50,13 @@ public class TbJsFilterNodeTest {
48 @Mock 50 @Mock
49 private ScriptEngine scriptEngine; 51 private ScriptEngine scriptEngine;
50 52
  53 + private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  54 + private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  55 +
51 @Test 56 @Test
52 public void falseEvaluationDoNotSendMsg() throws TbNodeException, ScriptException { 57 public void falseEvaluationDoNotSendMsg() throws TbNodeException, ScriptException {
53 initWithScript(); 58 initWithScript();
54 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}"); 59 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
55 mockJsExecutor(); 60 mockJsExecutor();
56 when(scriptEngine.executeFilter(msg)).thenReturn(false); 61 when(scriptEngine.executeFilter(msg)).thenReturn(false);
57 62
@@ -64,7 +69,7 @@ public class TbJsFilterNodeTest { @@ -64,7 +69,7 @@ public class TbJsFilterNodeTest {
64 public void exceptionInJsThrowsException() throws TbNodeException, ScriptException { 69 public void exceptionInJsThrowsException() throws TbNodeException, ScriptException {
65 initWithScript(); 70 initWithScript();
66 TbMsgMetaData metaData = new TbMsgMetaData(); 71 TbMsgMetaData metaData = new TbMsgMetaData();
67 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}"); 72 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
68 mockJsExecutor(); 73 mockJsExecutor();
69 when(scriptEngine.executeFilter(msg)).thenThrow(new ScriptException("error")); 74 when(scriptEngine.executeFilter(msg)).thenThrow(new ScriptException("error"));
70 75
@@ -77,7 +82,7 @@ public class TbJsFilterNodeTest { @@ -77,7 +82,7 @@ public class TbJsFilterNodeTest {
77 public void metadataConditionCanBeTrue() throws TbNodeException, ScriptException { 82 public void metadataConditionCanBeTrue() throws TbNodeException, ScriptException {
78 initWithScript(); 83 initWithScript();
79 TbMsgMetaData metaData = new TbMsgMetaData(); 84 TbMsgMetaData metaData = new TbMsgMetaData();
80 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}"); 85 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{}", ruleChainId, ruleNodeId, 0L);
81 mockJsExecutor(); 86 mockJsExecutor();
82 when(scriptEngine.executeFilter(msg)).thenReturn(true); 87 when(scriptEngine.executeFilter(msg)).thenReturn(true);
83 88
@@ -28,6 +28,8 @@ import org.mockito.Mock; @@ -28,6 +28,8 @@ import org.mockito.Mock;
28 import org.mockito.runners.MockitoJUnitRunner; 28 import org.mockito.runners.MockitoJUnitRunner;
29 import org.mockito.stubbing.Answer; 29 import org.mockito.stubbing.Answer;
30 import org.thingsboard.rule.engine.api.*; 30 import org.thingsboard.rule.engine.api.*;
  31 +import org.thingsboard.server.common.data.id.RuleChainId;
  32 +import org.thingsboard.server.common.data.id.RuleNodeId;
31 import org.thingsboard.server.common.msg.TbMsg; 33 import org.thingsboard.server.common.msg.TbMsg;
32 import org.thingsboard.server.common.msg.TbMsgMetaData; 34 import org.thingsboard.server.common.msg.TbMsgMetaData;
33 35
@@ -51,6 +53,9 @@ public class TbJsSwitchNodeTest { @@ -51,6 +53,9 @@ public class TbJsSwitchNodeTest {
51 @Mock 53 @Mock
52 private ScriptEngine scriptEngine; 54 private ScriptEngine scriptEngine;
53 55
  56 + private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  57 + private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  58 +
54 @Test 59 @Test
55 public void multipleRoutesAreAllowed() throws TbNodeException, ScriptException { 60 public void multipleRoutesAreAllowed() throws TbNodeException, ScriptException {
56 initWithScript(); 61 initWithScript();
@@ -59,7 +64,7 @@ public class TbJsSwitchNodeTest { @@ -59,7 +64,7 @@ public class TbJsSwitchNodeTest {
59 metaData.putValue("humidity", "99"); 64 metaData.putValue("humidity", "99");
60 String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; 65 String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
61 66
62 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 67 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
63 mockJsExecutor(); 68 mockJsExecutor();
64 when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three")); 69 when(scriptEngine.executeSwitch(msg)).thenReturn(Sets.newHashSet("one", "three"));
65 70
@@ -27,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; @@ -27,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
27 import org.thingsboard.rule.engine.api.TbNodeException; 27 import org.thingsboard.rule.engine.api.TbNodeException;
28 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
29 import org.thingsboard.server.common.data.id.EntityId; 29 import org.thingsboard.server.common.data.id.EntityId;
  30 +import org.thingsboard.server.common.data.id.RuleChainId;
  31 +import org.thingsboard.server.common.data.id.RuleNodeId;
30 import org.thingsboard.server.common.msg.TbMsg; 32 import org.thingsboard.server.common.msg.TbMsg;
31 import org.thingsboard.server.common.msg.TbMsgMetaData; 33 import org.thingsboard.server.common.msg.TbMsgMetaData;
32 34
@@ -34,7 +36,9 @@ import java.io.IOException; @@ -34,7 +36,9 @@ import java.io.IOException;
34 36
35 import static org.junit.Assert.assertEquals; 37 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertNotSame; 38 import static org.junit.Assert.assertNotSame;
  39 +import static org.mockito.Matchers.any;
37 import static org.mockito.Mockito.verify; 40 import static org.mockito.Mockito.verify;
  41 +import static org.mockito.Mockito.when;
38 42
39 @RunWith(MockitoJUnitRunner.class) 43 @RunWith(MockitoJUnitRunner.class)
40 public class TbMsgToEmailNodeTest { 44 public class TbMsgToEmailNodeTest {
@@ -48,26 +52,31 @@ public class TbMsgToEmailNodeTest { @@ -48,26 +52,31 @@ public class TbMsgToEmailNodeTest {
48 private TbMsgMetaData metaData = new TbMsgMetaData(); 52 private TbMsgMetaData metaData = new TbMsgMetaData();
49 private String rawJson = "{\"name\": \"temp\", \"passed\": 5 , \"complex\": {\"val\":12, \"count\":100}}"; 53 private String rawJson = "{\"name\": \"temp\", \"passed\": 5 , \"complex\": {\"val\":12, \"count\":100}}";
50 54
  55 + private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  56 + private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  57 +
51 @Test 58 @Test
52 public void msgCanBeConverted() throws IOException { 59 public void msgCanBeConverted() throws IOException {
53 initWithScript(); 60 initWithScript();
54 metaData.putValue("username", "oreo"); 61 metaData.putValue("username", "oreo");
55 metaData.putValue("userEmail", "user@email.io"); 62 metaData.putValue("userEmail", "user@email.io");
56 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson); 63 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", originator, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
57 64
58 emailNode.onMsg(ctx, msg); 65 emailNode.onMsg(ctx, msg);
59 66
60 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
61 - verify(ctx).tellNext(captor.capture());  
62 - TbMsg actualMsg = captor.getValue(); 67 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  68 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  69 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  70 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  71 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
63 72
64 - assertEquals("SEND_EMAIL", actualMsg.getType());  
65 - assertEquals(originator, actualMsg.getOriginator());  
66 - assertEquals("oreo", actualMsg.getMetaData().getValue("username"));  
67 - assertNotSame(metaData, actualMsg.getMetaData());  
68 73
  74 + assertEquals("SEND_EMAIL", typeCaptor.getValue());
  75 + assertEquals(originator, originatorCaptor.getValue());
  76 + assertEquals("oreo", metadataCaptor.getValue().getValue("username"));
  77 + assertNotSame(metaData, metadataCaptor.getValue());
69 78
70 - EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class); 79 + EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class);
71 80
72 EmailPojo expected = new EmailPojo.EmailPojoBuilder() 81 EmailPojo expected = new EmailPojo.EmailPojoBuilder()
73 .from("test@mail.org") 82 .from("test@mail.org")
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.asset.Asset; @@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.asset.Asset;
34 import org.thingsboard.server.common.data.id.AssetId; 34 import org.thingsboard.server.common.data.id.AssetId;
35 import org.thingsboard.server.common.data.id.CustomerId; 35 import org.thingsboard.server.common.data.id.CustomerId;
36 import org.thingsboard.server.common.data.id.DeviceId; 36 import org.thingsboard.server.common.data.id.DeviceId;
  37 +import org.thingsboard.server.common.data.id.RuleChainId;
  38 +import org.thingsboard.server.common.data.id.RuleNodeId;
37 import org.thingsboard.server.common.data.id.UserId; 39 import org.thingsboard.server.common.data.id.UserId;
38 import org.thingsboard.server.common.data.kv.*; 40 import org.thingsboard.server.common.data.kv.*;
39 import org.thingsboard.server.common.msg.TbMsg; 41 import org.thingsboard.server.common.msg.TbMsg;
@@ -77,6 +79,9 @@ public class TbGetCustomerAttributeNodeTest { @@ -77,6 +79,9 @@ public class TbGetCustomerAttributeNodeTest {
77 79
78 private TbMsg msg; 80 private TbMsg msg;
79 81
  82 + private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  83 + private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  84 +
80 @Before 85 @Before
81 public void init() throws TbNodeException { 86 public void init() throws TbNodeException {
82 TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration(); 87 TbGetEntityAttrNodeConfiguration config = new TbGetEntityAttrNodeConfiguration();
@@ -98,7 +103,8 @@ public class TbGetCustomerAttributeNodeTest { @@ -98,7 +103,8 @@ public class TbGetCustomerAttributeNodeTest {
98 User user = new User(); 103 User user = new User();
99 user.setCustomerId(customerId); 104 user.setCustomerId(customerId);
100 105
101 - msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}"); 106 +
  107 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
102 108
103 when(ctx.getUserService()).thenReturn(userService); 109 when(ctx.getUserService()).thenReturn(userService);
104 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); 110 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -123,7 +129,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -123,7 +129,7 @@ public class TbGetCustomerAttributeNodeTest {
123 User user = new User(); 129 User user = new User();
124 user.setCustomerId(customerId); 130 user.setCustomerId(customerId);
125 131
126 - msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}"); 132 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
127 133
128 when(ctx.getUserService()).thenReturn(userService); 134 when(ctx.getUserService()).thenReturn(userService);
129 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); 135 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -148,7 +154,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -148,7 +154,7 @@ public class TbGetCustomerAttributeNodeTest {
148 User user = new User(); 154 User user = new User();
149 user.setCustomerId(customerId); 155 user.setCustomerId(customerId);
150 156
151 - msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}"); 157 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
152 158
153 when(ctx.getUserService()).thenReturn(userService); 159 when(ctx.getUserService()).thenReturn(userService);
154 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null)); 160 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(null));
@@ -166,7 +172,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -166,7 +172,7 @@ public class TbGetCustomerAttributeNodeTest {
166 @Test 172 @Test
167 public void customerAttributeAddedInMetadata() { 173 public void customerAttributeAddedInMetadata() {
168 CustomerId customerId = new CustomerId(UUIDs.timeBased()); 174 CustomerId customerId = new CustomerId(UUIDs.timeBased());
169 - msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}"); 175 + msg = new TbMsg(UUIDs.timeBased(), "CUSTOMER", customerId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
170 entityAttributeFetched(customerId); 176 entityAttributeFetched(customerId);
171 } 177 }
172 178
@@ -177,7 +183,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -177,7 +183,7 @@ public class TbGetCustomerAttributeNodeTest {
177 User user = new User(); 183 User user = new User();
178 user.setCustomerId(customerId); 184 user.setCustomerId(customerId);
179 185
180 - msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}"); 186 + msg = new TbMsg(UUIDs.timeBased(), "USER", userId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
181 187
182 when(ctx.getUserService()).thenReturn(userService); 188 when(ctx.getUserService()).thenReturn(userService);
183 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user)); 189 when(userService.findUserByIdAsync(userId)).thenReturn(Futures.immediateFuture(user));
@@ -192,7 +198,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -192,7 +198,7 @@ public class TbGetCustomerAttributeNodeTest {
192 Asset asset = new Asset(); 198 Asset asset = new Asset();
193 asset.setCustomerId(customerId); 199 asset.setCustomerId(customerId);
194 200
195 - msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}"); 201 + msg = new TbMsg(UUIDs.timeBased(), "USER", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
196 202
197 when(ctx.getAssetService()).thenReturn(assetService); 203 when(ctx.getAssetService()).thenReturn(assetService);
198 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); 204 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
@@ -207,7 +213,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -207,7 +213,7 @@ public class TbGetCustomerAttributeNodeTest {
207 Device device = new Device(); 213 Device device = new Device();
208 device.setCustomerId(customerId); 214 device.setCustomerId(customerId);
209 215
210 - msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}"); 216 + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
211 217
212 when(ctx.getDeviceService()).thenReturn(deviceService); 218 when(ctx.getDeviceService()).thenReturn(deviceService);
213 when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device)); 219 when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
@@ -234,7 +240,7 @@ public class TbGetCustomerAttributeNodeTest { @@ -234,7 +240,7 @@ public class TbGetCustomerAttributeNodeTest {
234 Device device = new Device(); 240 Device device = new Device();
235 device.setCustomerId(customerId); 241 device.setCustomerId(customerId);
236 242
237 - msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}"); 243 + msg = new TbMsg(UUIDs.timeBased(), "USER", deviceId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
238 244
239 when(ctx.getDeviceService()).thenReturn(deviceService); 245 when(ctx.getDeviceService()).thenReturn(deviceService);
240 when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device)); 246 when(deviceService.findDeviceByIdAsync(deviceId)).thenReturn(Futures.immediateFuture(device));
@@ -29,6 +29,9 @@ import org.thingsboard.rule.engine.api.TbNodeException; @@ -29,6 +29,9 @@ import org.thingsboard.rule.engine.api.TbNodeException;
29 import org.thingsboard.server.common.data.asset.Asset; 29 import org.thingsboard.server.common.data.asset.Asset;
30 import org.thingsboard.server.common.data.id.AssetId; 30 import org.thingsboard.server.common.data.id.AssetId;
31 import org.thingsboard.server.common.data.id.CustomerId; 31 import org.thingsboard.server.common.data.id.CustomerId;
  32 +import org.thingsboard.server.common.data.id.EntityId;
  33 +import org.thingsboard.server.common.data.id.RuleChainId;
  34 +import org.thingsboard.server.common.data.id.RuleNodeId;
32 import org.thingsboard.server.common.msg.TbMsg; 35 import org.thingsboard.server.common.msg.TbMsg;
33 import org.thingsboard.server.common.msg.TbMsgMetaData; 36 import org.thingsboard.server.common.msg.TbMsgMetaData;
34 import org.thingsboard.server.dao.asset.AssetService; 37 import org.thingsboard.server.dao.asset.AssetService;
@@ -57,17 +60,23 @@ public class TbChangeOriginatorNodeTest { @@ -57,17 +60,23 @@ public class TbChangeOriginatorNodeTest {
57 Asset asset = new Asset(); 60 Asset asset = new Asset();
58 asset.setCustomerId(customerId); 61 asset.setCustomerId(customerId);
59 62
60 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}"); 63 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  64 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  65 +
  66 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
61 67
62 when(ctx.getAssetService()).thenReturn(assetService); 68 when(ctx.getAssetService()).thenReturn(assetService);
63 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); 69 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
64 70
65 node.onMsg(ctx, msg); 71 node.onMsg(ctx, msg);
66 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
67 - verify(ctx).tellNext(captor.capture());  
68 - TbMsg actualMsg = captor.getValue();  
69 - assertEquals(customerId, actualMsg.getOriginator());  
70 - assertEquals(msg.getId(), actualMsg.getId()); 72 +
  73 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  74 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  75 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  76 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  77 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
  78 +
  79 + assertEquals(customerId, originatorCaptor.getValue());
71 } 80 }
72 81
73 @Test 82 @Test
@@ -78,17 +87,22 @@ public class TbChangeOriginatorNodeTest { @@ -78,17 +87,22 @@ public class TbChangeOriginatorNodeTest {
78 Asset asset = new Asset(); 87 Asset asset = new Asset();
79 asset.setCustomerId(customerId); 88 asset.setCustomerId(customerId);
80 89
81 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}"); 90 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  91 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  92 +
  93 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
82 94
83 when(ctx.getAssetService()).thenReturn(assetService); 95 when(ctx.getAssetService()).thenReturn(assetService);
84 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); 96 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
85 97
86 node.onMsg(ctx, msg); 98 node.onMsg(ctx, msg);
87 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
88 - verify(ctx).spawn(captor.capture());  
89 - TbMsg actualMsg = captor.getValue();  
90 - assertEquals(customerId, actualMsg.getOriginator());  
91 - assertEquals(msg.getId(), actualMsg.getId()); 99 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  100 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  101 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  102 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  103 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
  104 +
  105 + assertEquals(customerId, originatorCaptor.getValue());
92 } 106 }
93 107
94 @Test 108 @Test
@@ -99,7 +113,10 @@ public class TbChangeOriginatorNodeTest { @@ -99,7 +113,10 @@ public class TbChangeOriginatorNodeTest {
99 Asset asset = new Asset(); 113 Asset asset = new Asset();
100 asset.setCustomerId(customerId); 114 asset.setCustomerId(customerId);
101 115
102 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}"); 116 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  117 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  118 +
  119 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "ASSET", assetId, new TbMsgMetaData(), "{}", ruleChainId, ruleNodeId, 0L);
103 120
104 when(ctx.getAssetService()).thenReturn(assetService); 121 when(ctx.getAssetService()).thenReturn(assetService);
105 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong"))); 122 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFailedFuture(new IllegalStateException("wrong")));
@@ -27,6 +27,8 @@ import org.mockito.Mock; @@ -27,6 +27,8 @@ import org.mockito.Mock;
27 import org.mockito.runners.MockitoJUnitRunner; 27 import org.mockito.runners.MockitoJUnitRunner;
28 import org.mockito.stubbing.Answer; 28 import org.mockito.stubbing.Answer;
29 import org.thingsboard.rule.engine.api.*; 29 import org.thingsboard.rule.engine.api.*;
  30 +import org.thingsboard.server.common.data.id.RuleChainId;
  31 +import org.thingsboard.server.common.data.id.RuleNodeId;
30 import org.thingsboard.server.common.msg.TbMsg; 32 import org.thingsboard.server.common.msg.TbMsg;
31 import org.thingsboard.server.common.msg.TbMsgMetaData; 33 import org.thingsboard.server.common.msg.TbMsgMetaData;
32 34
@@ -56,8 +58,10 @@ public class TbTransformMsgNodeTest { @@ -56,8 +58,10 @@ public class TbTransformMsgNodeTest {
56 metaData.putValue("temp", "7"); 58 metaData.putValue("temp", "7");
57 String rawJson = "{\"passed\": 5}"; 59 String rawJson = "{\"passed\": 5}";
58 60
59 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);  
60 - TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}"); 61 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  62 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  63 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
  64 + TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
61 mockJsExecutor(); 65 mockJsExecutor();
62 when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg); 66 when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
63 67
@@ -77,8 +81,10 @@ public class TbTransformMsgNodeTest { @@ -77,8 +81,10 @@ public class TbTransformMsgNodeTest {
77 metaData.putValue("temp", "7"); 81 metaData.putValue("temp", "7");
78 String rawJson = "{\"passed\": 5"; 82 String rawJson = "{\"passed\": 5";
79 83
80 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson);  
81 - TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}"); 84 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  85 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  86 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
  87 + TbMsg transformedMsg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, "{new}", ruleChainId, ruleNodeId, 0L);
82 mockJsExecutor(); 88 mockJsExecutor();
83 when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg); 89 when(scriptEngine.executeUpdate(msg)).thenReturn(transformedMsg);
84 90
@@ -97,7 +103,9 @@ public class TbTransformMsgNodeTest { @@ -97,7 +103,9 @@ public class TbTransformMsgNodeTest {
97 metaData.putValue("temp", "7"); 103 metaData.putValue("temp", "7");
98 String rawJson = "{\"passed\": 5"; 104 String rawJson = "{\"passed\": 5";
99 105
100 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 106 + RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased());
  107 + RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased());
  108 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, ruleChainId, ruleNodeId, 0L);
101 mockJsExecutor(); 109 mockJsExecutor();
102 when(scriptEngine.executeUpdate(msg)).thenThrow(new IllegalStateException("error")); 110 when(scriptEngine.executeUpdate(msg)).thenThrow(new IllegalStateException("error"));
103 111