Commit bfb055cc6e5044bec83155b2e91ee6e9f504fbc0

Authored by Dima Landiak
Committed by Andrew Shvayka
1 parent 4457b5a1

enqueue newly created messages

@@ -188,7 +188,7 @@ class AlarmState { @@ -188,7 +188,7 @@ class AlarmState {
188 setAlarmConditionMetadata(ruleState, metaData); 188 setAlarmConditionMetadata(ruleState, metaData);
189 TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", 189 TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM",
190 originator, msg != null ? msg.getCustomerId() : null, metaData, data); 190 originator, msg != null ? msg.getCustomerId() : null, metaData, data);
191 - ctx.tellNext(newMsg, relationType); 191 + ctx.enqueueForTellNext(newMsg, relationType);
192 } 192 }
193 193
194 protected void setAlarmConditionMetadata(AlarmRuleState ruleState, TbMsgMetaData metaData) { 194 protected void setAlarmConditionMetadata(AlarmRuleState ruleState, TbMsgMetaData metaData) {
@@ -120,7 +120,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -120,7 +120,7 @@ public class TbSendRPCRequestNode implements TbNode {
120 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); 120 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
121 } else { 121 } else {
122 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); 122 TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
123 - ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); 123 + ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name());
124 } 124 }
125 }); 125 });
126 ctx.ack(msg); 126 ctx.ack(msg);