Commit a2d2b559ac1bd2a94a0359fc92f210b5e79625d5

Authored by Andrew Shvayka
1 parent fb722c59

Queue put and ack

@@ -53,7 +53,6 @@ import java.util.function.Consumer; @@ -53,7 +53,6 @@ import java.util.function.Consumer;
53 */ 53 */
54 class DefaultTbContext implements TbContext { 54 class DefaultTbContext implements TbContext {
55 55
56 - private static final Function<? super List<Void>, ? extends Void> LIST_VOID_FUNCTION = v -> null;  
57 private final ActorSystemContext mainCtx; 56 private final ActorSystemContext mainCtx;
58 private final RuleNodeCtx nodeCtx; 57 private final RuleNodeCtx nodeCtx;
59 58
@@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext { @@ -120,7 +119,7 @@ class DefaultTbContext implements TbContext {
120 119
121 @Override 120 @Override
122 public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) { 121 public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
123 - return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data); 122 + return new TbMsg(UUIDs.timeBased(), type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), 0L);
124 } 123 }
125 124
126 @Override 125 @Override
@@ -169,12 +169,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -169,12 +169,12 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
169 169
170 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { 170 void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
171 checkActive(); 171 checkActive();
172 - putToQueue(envelope.getTbMsg(), msg -> pushMsgToNode(firstNode, msg)); 172 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> pushMsgToNode(firstNode, msg));
173 } 173 }
174 174
175 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) { 175 void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg envelope) {
176 checkActive(); 176 checkActive();
177 - putToQueue(envelope.getTbMsg(), msg -> { 177 + putToQueue(enrichWithRuleChainId(envelope.getTbMsg()), msg -> {
178 pushMsgToNode(firstNode, msg); 178 pushMsgToNode(firstNode, msg);
179 envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self); 179 envelope.getCallbackRef().tell(new RuleEngineQueuePutAckMsg(msg.getId()), self);
180 }); 180 });
@@ -185,7 +185,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -185,7 +185,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
185 RuleNodeId originator = envelope.getOriginator(); 185 RuleNodeId originator = envelope.getOriginator();
186 String targetRelationType = envelope.getRelationType(); 186 String targetRelationType = envelope.getRelationType();
187 List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream() 187 List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
188 - .filter(r -> targetRelationType == null || targetRelationType.equals(r.getType())) 188 + .filter(r -> targetRelationType == null || targetRelationType.equalsIgnoreCase(r.getType()))
189 .collect(Collectors.toList()); 189 .collect(Collectors.toList());
190 190
191 TbMsg msg = envelope.getMsg(); 191 TbMsg msg = envelope.getMsg();
@@ -212,7 +212,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -212,7 +212,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
212 } 212 }
213 } 213 }
214 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues. 214 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
215 - queue.ack(msg, msg.getRuleNodeId().getId(), msg.getClusterPartition()); 215 + EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
  216 + queue.ack(msg, ackId.getId(), msg.getClusterPartition());
216 } 217 }
217 } 218 }
218 219
@@ -232,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -232,4 +233,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
232 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self); 233 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
233 } 234 }
234 } 235 }
  236 +
  237 + private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
  238 + // We don't put firstNodeId because it may change over time;
  239 + return new TbMsg(tbMsg.getId(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData(), tbMsg.getData(), entityId, null, 0L);
  240 + }
235 } 241 }
@@ -87,6 +87,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract @@ -87,6 +87,7 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
87 } 87 }
88 88
89 protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) { 89 protected void putToQueue(final TbMsg tbMsg, final Consumer<TbMsg> onSuccess) {
  90 + EntityId entityId = tbMsg.getRuleNodeId() != null ? tbMsg.getRuleNodeId() : tbMsg.getRuleChainId();
90 Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() { 91 Futures.addCallback(queue.put(tbMsg, entityId.getId(), 0), new FutureCallback<Void>() {
91 @Override 92 @Override
92 public void onSuccess(@Nullable Void result) { 93 public void onSuccess(@Nullable Void result) {
@@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -80,7 +80,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
80 } 80 }
81 } 81 }
82 82
83 - @Ignore  
84 @Test 83 @Test
85 public void testServerMqttOneWayRpc() throws Exception { 84 public void testServerMqttOneWayRpc() throws Exception {
86 Device device = new Device(); 85 Device device = new Device();
@@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -107,7 +106,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
107 Assert.assertTrue(StringUtils.isEmpty(result)); 106 Assert.assertTrue(StringUtils.isEmpty(result));
108 } 107 }
109 108
110 - @Ignore  
111 @Test 109 @Test
112 public void testServerMqttOneWayRpcDeviceOffline() throws Exception { 110 public void testServerMqttOneWayRpcDeviceOffline() throws Exception {
113 Device device = new Device(); 111 Device device = new Device();
@@ -15,7 +15,7 @@ @@ -15,7 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.mqtt.rpc.sql; 16 package org.thingsboard.server.mqtt.rpc.sql;
17 17
18 -import org.thingsboard.server.dao.service.DaoNoSqlTest; 18 +import org.junit.Ignore;
19 import org.thingsboard.server.dao.service.DaoSqlTest; 19 import org.thingsboard.server.dao.service.DaoSqlTest;
20 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest; 20 import org.thingsboard.server.mqtt.rpc.AbstractMqttServerSideRpcIntegrationTest;
21 21
@@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -150,7 +150,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
150 "CUSTOM", 150 "CUSTOM",
151 device.getId(), 151 device.getId(),
152 new TbMsgMetaData(), 152 new TbMsgMetaData(),
153 - "{}"); 153 + "{}", null, null, 0L);
154 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 154 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
155 155
156 Thread.sleep(3000); 156 Thread.sleep(3000);
@@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @@ -138,7 +138,8 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
138 "CUSTOM", 138 "CUSTOM",
139 device.getId(), 139 device.getId(),
140 new TbMsgMetaData(), 140 new TbMsgMetaData(),
141 - "{}"); 141 + "{}",
  142 + null, null, 0L);
142 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 143 actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg));
143 144
144 Thread.sleep(3000); 145 Thread.sleep(3000);
@@ -42,7 +42,7 @@ public class NashornJsEngineTest { @@ -42,7 +42,7 @@ public class NashornJsEngineTest {
42 metaData.putValue("humidity", "99"); 42 metaData.putValue("humidity", "99");
43 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 43 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
44 44
45 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 45 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
46 46
47 TbMsg actual = scriptEngine.executeUpdate(msg); 47 TbMsg actual = scriptEngine.executeUpdate(msg);
48 assertEquals("70", actual.getMetaData().getValue("temp")); 48 assertEquals("70", actual.getMetaData().getValue("temp"));
@@ -57,7 +57,7 @@ public class NashornJsEngineTest { @@ -57,7 +57,7 @@ public class NashornJsEngineTest {
57 metaData.putValue("humidity", "99"); 57 metaData.putValue("humidity", "99");
58 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 58 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
59 59
60 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 60 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
61 61
62 TbMsg actual = scriptEngine.executeUpdate(msg); 62 TbMsg actual = scriptEngine.executeUpdate(msg);
63 assertEquals("94", actual.getMetaData().getValue("newAttr")); 63 assertEquals("94", actual.getMetaData().getValue("newAttr"));
@@ -72,7 +72,7 @@ public class NashornJsEngineTest { @@ -72,7 +72,7 @@ public class NashornJsEngineTest {
72 metaData.putValue("humidity", "99"); 72 metaData.putValue("humidity", "99");
73 String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}"; 73 String rawJson = "{\"name\":\"Vit\",\"passed\": 5,\"bigObj\":{\"prop\":42}}";
74 74
75 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 75 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
76 76
77 TbMsg actual = scriptEngine.executeUpdate(msg); 77 TbMsg actual = scriptEngine.executeUpdate(msg);
78 78
@@ -89,7 +89,7 @@ public class NashornJsEngineTest { @@ -89,7 +89,7 @@ public class NashornJsEngineTest {
89 metaData.putValue("humidity", "99"); 89 metaData.putValue("humidity", "99");
90 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 90 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
91 91
92 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 92 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
93 assertFalse(scriptEngine.executeFilter(msg)); 93 assertFalse(scriptEngine.executeFilter(msg));
94 } 94 }
95 95
@@ -102,7 +102,7 @@ public class NashornJsEngineTest { @@ -102,7 +102,7 @@ public class NashornJsEngineTest {
102 metaData.putValue("humidity", "99"); 102 metaData.putValue("humidity", "99");
103 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 103 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
104 104
105 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 105 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
106 assertTrue(scriptEngine.executeFilter(msg)); 106 assertTrue(scriptEngine.executeFilter(msg));
107 } 107 }
108 108
@@ -122,7 +122,7 @@ public class NashornJsEngineTest { @@ -122,7 +122,7 @@ public class NashornJsEngineTest {
122 metaData.putValue("humidity", "99"); 122 metaData.putValue("humidity", "99");
123 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 123 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
124 124
125 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 125 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
126 Set<String> actual = scriptEngine.executeSwitch(msg); 126 Set<String> actual = scriptEngine.executeSwitch(msg);
127 assertEquals(Sets.newHashSet("one"), actual); 127 assertEquals(Sets.newHashSet("one"), actual);
128 } 128 }
@@ -143,7 +143,7 @@ public class NashornJsEngineTest { @@ -143,7 +143,7 @@ public class NashornJsEngineTest {
143 metaData.putValue("humidity", "99"); 143 metaData.putValue("humidity", "99");
144 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}"; 144 String rawJson = "{\"name\": \"Vit\", \"passed\": 5, \"bigObj\": {\"prop\":42}}";
145 145
146 - TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson); 146 + TbMsg msg = new TbMsg(UUIDs.timeBased(), "USER", null, metaData, rawJson, null, null, 0L);
147 Set<String> actual = scriptEngine.executeSwitch(msg); 147 Set<String> actual = scriptEngine.executeSwitch(msg);
148 assertEquals(Sets.newHashSet("one", "three"), actual); 148 assertEquals(Sets.newHashSet("one", "three"), actual);
149 } 149 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql.queue;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.dao.queue.MsgQueue;
  24 +import org.thingsboard.server.dao.util.SqlDao;
  25 +
  26 +import java.util.Collections;
  27 +import java.util.UUID;
  28 +
  29 +/**
  30 + * Created by ashvayka on 27.04.18.
  31 + */
  32 +@Component
  33 +@Slf4j
  34 +@SqlDao
  35 +public class DummySqlMsgQueue implements MsgQueue {
  36 + @Override
  37 + public ListenableFuture<Void> put(TbMsg msg, UUID nodeId, long clusterPartition) {
  38 + return Futures.immediateFuture(null);
  39 + }
  40 +
  41 + @Override
  42 + public ListenableFuture<Void> ack(TbMsg msg, UUID nodeId, long clusterPartition) {
  43 + return Futures.immediateFuture(null);
  44 + }
  45 +
  46 + @Override
  47 + public Iterable<TbMsg> findUnprocessed(UUID nodeId, long clusterPartition) {
  48 + return Collections.emptyList();
  49 + }
  50 +}
@@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode { @@ -61,7 +61,7 @@ public class TbJsSwitchNode implements TbNode {
61 ctx.tellNext(msg, nextRelations); 61 ctx.tellNext(msg, nextRelations);
62 } 62 }
63 63
64 - @Override 64 + @Override
65 public void destroy() { 65 public void destroy() {
66 if (jsEngine != null) { 66 if (jsEngine != null) {
67 jsEngine.destroy(); 67 jsEngine.destroy();
@@ -118,17 +118,21 @@ public class TbAlarmNodeTest { @@ -118,17 +118,21 @@ public class TbAlarmNodeTest {
118 118
119 node.onMsg(ctx, msg); 119 node.onMsg(ctx, msg);
120 120
121 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
122 - verify(ctx).tellNext(captor.capture(), eq("Created"));  
123 - TbMsg actualMsg = captor.getValue(); 121 + verify(ctx).tellNext(any(), eq("Created"));
124 122
125 - assertEquals("ALARM", actualMsg.getType());  
126 - assertEquals(originator, actualMsg.getOriginator());  
127 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
128 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));  
129 - assertNotSame(metaData, actualMsg.getMetaData()); 123 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  124 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  125 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  126 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  127 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
130 128
131 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 129 + assertEquals("ALARM", typeCaptor.getValue());
  130 + assertEquals(originator, originatorCaptor.getValue());
  131 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  132 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
  133 + assertNotSame(metaData, metadataCaptor.getValue());
  134 +
  135 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
132 Alarm expectedAlarm = Alarm.builder() 136 Alarm expectedAlarm = Alarm.builder()
133 .tenantId(tenantId) 137 .tenantId(tenantId)
134 .originator(originator) 138 .originator(originator)
@@ -208,17 +212,22 @@ public class TbAlarmNodeTest { @@ -208,17 +212,22 @@ public class TbAlarmNodeTest {
208 212
209 node.onMsg(ctx, msg); 213 node.onMsg(ctx, msg);
210 214
211 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
212 - verify(ctx).tellNext(captor.capture(), eq("Created"));  
213 - TbMsg actualMsg = captor.getValue(); 215 + verify(ctx).tellNext(any(), eq("Created"));
  216 +
  217 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  218 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  219 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  220 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  221 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
  222 +
  223 + assertEquals("ALARM", typeCaptor.getValue());
  224 + assertEquals(originator, originatorCaptor.getValue());
  225 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  226 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_NEW_ALARM));
  227 + assertNotSame(metaData, metadataCaptor.getValue());
214 228
215 - assertEquals("ALARM", actualMsg.getType());  
216 - assertEquals(originator, actualMsg.getOriginator());  
217 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
218 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_NEW_ALARM));  
219 - assertNotSame(metaData, actualMsg.getMetaData());  
220 229
221 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 230 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
222 Alarm expectedAlarm = Alarm.builder() 231 Alarm expectedAlarm = Alarm.builder()
223 .tenantId(tenantId) 232 .tenantId(tenantId)
224 .originator(originator) 233 .originator(originator)
@@ -252,17 +261,21 @@ public class TbAlarmNodeTest { @@ -252,17 +261,21 @@ public class TbAlarmNodeTest {
252 261
253 node.onMsg(ctx, msg); 262 node.onMsg(ctx, msg);
254 263
255 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
256 - verify(ctx).tellNext(captor.capture(), eq("Updated"));  
257 - TbMsg actualMsg = captor.getValue(); 264 + verify(ctx).tellNext(any(), eq("Updated"));
258 265
259 - assertEquals("ALARM", actualMsg.getType());  
260 - assertEquals(originator, actualMsg.getOriginator());  
261 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
262 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_EXISTING_ALARM));  
263 - assertNotSame(metaData, actualMsg.getMetaData()); 266 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  267 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  268 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  269 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  270 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
264 271
265 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 272 + assertEquals("ALARM", typeCaptor.getValue());
  273 + assertEquals(originator, originatorCaptor.getValue());
  274 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  275 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_EXISTING_ALARM));
  276 + assertNotSame(metaData, metadataCaptor.getValue());
  277 +
  278 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
266 assertTrue(activeAlarm.getEndTs() > oldEndDate); 279 assertTrue(activeAlarm.getEndTs() > oldEndDate);
267 Alarm expectedAlarm = Alarm.builder() 280 Alarm expectedAlarm = Alarm.builder()
268 .tenantId(tenantId) 281 .tenantId(tenantId)
@@ -298,17 +311,21 @@ public class TbAlarmNodeTest { @@ -298,17 +311,21 @@ public class TbAlarmNodeTest {
298 311
299 node.onMsg(ctx, msg); 312 node.onMsg(ctx, msg);
300 313
301 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
302 - verify(ctx).tellNext(captor.capture(), eq("Cleared"));  
303 - TbMsg actualMsg = captor.getValue(); 314 + verify(ctx).tellNext(any(), eq("Cleared"));
  315 +
  316 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  317 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  318 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  319 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  320 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
304 321
305 - assertEquals("ALARM", actualMsg.getType());  
306 - assertEquals(originator, actualMsg.getOriginator());  
307 - assertEquals("value", actualMsg.getMetaData().getValue("key"));  
308 - assertEquals(Boolean.TRUE.toString(), actualMsg.getMetaData().getValue(IS_CLEARED_ALARM));  
309 - assertNotSame(metaData, actualMsg.getMetaData()); 322 + assertEquals("ALARM", typeCaptor.getValue());
  323 + assertEquals(originator, originatorCaptor.getValue());
  324 + assertEquals("value", metadataCaptor.getValue().getValue("key"));
  325 + assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
  326 + assertNotSame(metaData, metadataCaptor.getValue());
310 327
311 - Alarm actualAlarm = new ObjectMapper().readValue(actualMsg.getData().getBytes(), Alarm.class); 328 + Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
312 Alarm expectedAlarm = Alarm.builder() 329 Alarm expectedAlarm = Alarm.builder()
313 .tenantId(tenantId) 330 .tenantId(tenantId)
314 .originator(originator) 331 .originator(originator)
@@ -36,7 +36,9 @@ import java.io.IOException; @@ -36,7 +36,9 @@ import java.io.IOException;
36 36
37 import static org.junit.Assert.assertEquals; 37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertNotSame; 38 import static org.junit.Assert.assertNotSame;
  39 +import static org.mockito.Matchers.any;
39 import static org.mockito.Mockito.verify; 40 import static org.mockito.Mockito.verify;
  41 +import static org.mockito.Mockito.when;
40 42
41 @RunWith(MockitoJUnitRunner.class) 43 @RunWith(MockitoJUnitRunner.class)
42 public class TbMsgToEmailNodeTest { 44 public class TbMsgToEmailNodeTest {
@@ -62,17 +64,19 @@ public class TbMsgToEmailNodeTest { @@ -62,17 +64,19 @@ public class TbMsgToEmailNodeTest {
62 64
63 emailNode.onMsg(ctx, msg); 65 emailNode.onMsg(ctx, msg);
64 66
65 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
66 - verify(ctx).tellNext(captor.capture());  
67 - TbMsg actualMsg = captor.getValue(); 67 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  68 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  69 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  70 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  71 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
68 72
69 - assertEquals("SEND_EMAIL", actualMsg.getType());  
70 - assertEquals(originator, actualMsg.getOriginator());  
71 - assertEquals("oreo", actualMsg.getMetaData().getValue("username"));  
72 - assertNotSame(metaData, actualMsg.getMetaData());  
73 73
  74 + assertEquals("SEND_EMAIL", typeCaptor.getValue());
  75 + assertEquals(originator, originatorCaptor.getValue());
  76 + assertEquals("oreo", metadataCaptor.getValue().getValue("username"));
  77 + assertNotSame(metaData, metadataCaptor.getValue());
74 78
75 - EmailPojo actual = new ObjectMapper().readValue(actualMsg.getData().getBytes(), EmailPojo.class); 79 + EmailPojo actual = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), EmailPojo.class);
76 80
77 EmailPojo expected = new EmailPojo.EmailPojoBuilder() 81 EmailPojo expected = new EmailPojo.EmailPojoBuilder()
78 .from("test@mail.org") 82 .from("test@mail.org")
@@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbNodeException; @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
29 import org.thingsboard.server.common.data.asset.Asset; 29 import org.thingsboard.server.common.data.asset.Asset;
30 import org.thingsboard.server.common.data.id.AssetId; 30 import org.thingsboard.server.common.data.id.AssetId;
31 import org.thingsboard.server.common.data.id.CustomerId; 31 import org.thingsboard.server.common.data.id.CustomerId;
  32 +import org.thingsboard.server.common.data.id.EntityId;
32 import org.thingsboard.server.common.data.id.RuleChainId; 33 import org.thingsboard.server.common.data.id.RuleChainId;
33 import org.thingsboard.server.common.data.id.RuleNodeId; 34 import org.thingsboard.server.common.data.id.RuleNodeId;
34 import org.thingsboard.server.common.msg.TbMsg; 35 import org.thingsboard.server.common.msg.TbMsg;
@@ -68,11 +69,14 @@ public class TbChangeOriginatorNodeTest { @@ -68,11 +69,14 @@ public class TbChangeOriginatorNodeTest {
68 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); 69 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
69 70
70 node.onMsg(ctx, msg); 71 node.onMsg(ctx, msg);
71 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
72 - verify(ctx).tellNext(captor.capture());  
73 - TbMsg actualMsg = captor.getValue();  
74 - assertEquals(customerId, actualMsg.getOriginator());  
75 - assertEquals(msg.getId(), actualMsg.getId()); 72 +
  73 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  74 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  75 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  76 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  77 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
  78 +
  79 + assertEquals(customerId, originatorCaptor.getValue());
76 } 80 }
77 81
78 @Test 82 @Test
@@ -92,11 +96,13 @@ public class TbChangeOriginatorNodeTest { @@ -92,11 +96,13 @@ public class TbChangeOriginatorNodeTest {
92 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset)); 96 when(assetService.findAssetByIdAsync(assetId)).thenReturn(Futures.immediateFuture(asset));
93 97
94 node.onMsg(ctx, msg); 98 node.onMsg(ctx, msg);
95 - ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class);  
96 - verify(ctx).spawn(captor.capture());  
97 - TbMsg actualMsg = captor.getValue();  
98 - assertEquals(customerId, actualMsg.getOriginator());  
99 - assertEquals(msg.getId(), actualMsg.getId()); 99 + ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
  100 + ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
  101 + ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
  102 + ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
  103 + verify(ctx).newMsg(typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
  104 +
  105 + assertEquals(customerId, originatorCaptor.getValue());
100 } 106 }
101 107
102 @Test 108 @Test