Commit 65c644b86fa7463cfce7048d4715d3ce810e7550

Authored by Andrii Shvaika
1 parent 480d4d69

Message is pushed to correct queue in case of duplication

... ... @@ -221,8 +221,8 @@ class DefaultTbContext implements TbContext {
221 221 }
222 222
223 223 @Override
224   - public TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
225   - return TbMsg.newMsg(type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
  224 + public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
  225 + return TbMsg.newMsg(queueName, type, originator, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
226 226 }
227 227
228 228 @Override
... ...
... ... @@ -244,7 +244,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
244 244 try {
245 245 checkActive(msg);
246 246 EntityId entityId = msg.getOriginator();
247   - TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
  247 + TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
248 248 List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
249 249 .filter(r -> contains(relationTypes, r.getType()))
250 250 .collect(Collectors.toList());
... ...
... ... @@ -166,7 +166,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
166 166 TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx);
167 167 try {
168 168 if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
169   - forwardToRuleEngineActor(tenantId, toRuleEngineMsg, callback);
  169 + forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
170 170 } else {
171 171 callback.onSuccess();
172 172 }
... ... @@ -180,7 +180,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
180 180 timeout = true;
181 181 }
182 182
183   - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(timeout, ctx);
  183 + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
184 184 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
185 185 if (statsEnabled) {
186 186 stats.log(result, decision.isCommit());
... ... @@ -246,8 +246,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
246 246 }
247 247 }
248 248
249   - private void forwardToRuleEngineActor(TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
250   - TbMsg tbMsg = TbMsg.fromBytes(toRuleEngineMsg.getTbMsg().toByteArray(), callback);
  249 + private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
  250 + TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
251 251 QueueToRuleEngineMsg msg;
252 252 ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
253 253 Set<String> relationTypes = null;
... ...
... ... @@ -28,13 +28,16 @@ import java.util.concurrent.ConcurrentMap;
28 28 public class TbRuleEngineProcessingResult {
29 29
30 30 @Getter
  31 + private final String queueName;
  32 + @Getter
31 33 private final boolean success;
32 34 @Getter
33 35 private final boolean timeout;
34 36 @Getter
35 37 private final TbMsgPackProcessingContext ctx;
36 38
37   - public TbRuleEngineProcessingResult(boolean timeout, TbMsgPackProcessingContext ctx) {
  39 + public TbRuleEngineProcessingResult(String queueName, boolean timeout, TbMsgPackProcessingContext ctx) {
  40 + this.queueName = queueName;
38 41 this.timeout = timeout;
39 42 this.ctx = ctx;
40 43 this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();
... ...
... ... @@ -100,7 +100,7 @@ public class TbRuleEngineProcessingStrategyFactory {
100 100 }
101 101 log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
102 102 if (log.isTraceEnabled()) {
103   - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  103 + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
104 104 }
105 105 if (pauseBetweenRetries > 0) {
106 106 try {
... ... @@ -129,10 +129,10 @@ public class TbRuleEngineProcessingStrategyFactory {
129 129 log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
130 130 }
131 131 if (log.isTraceEnabled()) {
132   - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  132 + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
133 133 }
134 134 if (log.isTraceEnabled()) {
135   - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
  135 + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
136 136 }
137 137 return new TbRuleEngineProcessingDecision(true, null);
138 138 }
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
25 25 import org.thingsboard.server.common.data.id.RuleChainId;
26 26 import org.thingsboard.server.common.data.id.RuleNodeId;
27 27 import org.thingsboard.server.common.msg.gen.MsgProtos;
  28 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
28 29 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
29 30
30 31 import java.io.IOException;
... ... @@ -39,6 +40,7 @@ import java.util.UUID;
39 40 @Slf4j
40 41 public final class TbMsg implements Serializable {
41 42
  43 + private final String queueName;
42 44 private final UUID id;
43 45 private final long ts;
44 46 private final String type;
... ... @@ -51,39 +53,44 @@ public final class TbMsg implements Serializable {
51 53 //This field is not serialized because we use queues and there is no need to do it
52 54 transient private final TbMsgCallback callback;
53 55
  56 + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  57 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  58 + }
  59 +
54 60 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
55   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
  61 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
56 62 }
57 63
58   - public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
59   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  64 + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
  65 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
60 66 }
61 67
62 68 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
63   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
  69 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
64 70 }
65 71
66 72 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
67   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  73 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
68 74 }
69 75
70 76 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
71   - return new TbMsg(UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
  77 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
72 78 }
73 79
74 80 public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
75   - return new TbMsg(origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
  81 + return new TbMsg(origMsg.getQueueName(), origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
76 82 data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
77 83 }
78 84
79 85 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
80   - return new TbMsg(UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
  86 + return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
81 87 tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
82 88 }
83 89
84   - private TbMsg(UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
  90 + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
85 91 RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
86 92 this.id = id;
  93 + this.queueName = queueName;
87 94 if (ts > 0) {
88 95 this.ts = ts;
89 96 } else {
... ... @@ -136,7 +143,7 @@ public final class TbMsg implements Serializable {
136 143 return builder.build().toByteArray();
137 144 }
138 145
139   - public static TbMsg fromBytes(byte[] data, TbMsgCallback callback) {
  146 + public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) {
140 147 try {
141 148 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
142 149 TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
... ... @@ -150,18 +157,18 @@ public final class TbMsg implements Serializable {
150 157 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
151 158 }
152 159 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
153   - return new TbMsg(UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
  160 + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
154 161 } catch (InvalidProtocolBufferException e) {
155 162 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
156 163 }
157 164 }
158 165
159 166 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
160   - return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
  167 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
161 168 }
162 169
163 170 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
164   - return new TbMsg(this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
  171 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
165 172 }
166 173
167 174 public TbMsgCallback getCallback() {
... ... @@ -172,4 +179,8 @@ public final class TbMsg implements Serializable {
172 179 return TbMsgCallback.EMPTY;
173 180 }
174 181 }
  182 +
  183 + public String getQueueName() {
  184 + return queueName != null ? queueName : ServiceQueue.MAIN;
  185 + }
175 186 }
... ...
... ... @@ -34,7 +34,7 @@ public class ServiceQueue {
34 34
35 35 public ServiceQueue(ServiceType type, String queue) {
36 36 this.type = type;
37   - this.queue = queue;
  37 + this.queue = queue != null ? queue : MAIN;
38 38 }
39 39
40 40 public ServiceType getType() {
... ...
... ... @@ -130,7 +130,7 @@ public interface TbContext {
130 130
131 131 void ack(TbMsg tbMsg);
132 132
133   - TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data);
  133 + TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data);
134 134
135 135 TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
136 136
... ...
... ... @@ -136,7 +136,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
136 136 }
137 137
138 138 private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
139   - ctx.enqueueForTellNext(ctx.newMsg(msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS);
  139 + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getMetaData(), msg.getData()), SUCCESS);
140 140 }
141 141
142 142 private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
... ...
... ... @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
25 25 import org.thingsboard.server.common.msg.TbMsg;
26 26 import org.thingsboard.server.common.msg.TbMsgDataType;
27 27 import org.thingsboard.server.common.msg.TbMsgMetaData;
  28 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
28 29 import org.thingsboard.server.common.msg.session.SessionMsgType;
29 30
30 31 import java.util.UUID;
... ... @@ -75,7 +76,7 @@ public class TbMsgCountNode implements TbNode {
75 76 TbMsgMetaData metaData = new TbMsgMetaData();
76 77 metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
77 78
78   - TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
  79 + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson));
79 80 ctx.enqueueForTellNext(tbMsg, SUCCESS);
80 81 scheduleTickMsg(ctx);
81 82 } else {
... ... @@ -91,7 +92,7 @@ public class TbMsgCountNode implements TbNode {
91 92 }
92 93 lastScheduledTs = lastScheduledTs + delay;
93 94 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
94   - TbMsg tickMsg = ctx.newMsg(TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
  95 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_COUNT_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
95 96 nextTickId = tickMsg.getId();
96 97 ctx.tellSelf(tickMsg, curDelay);
97 98 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
26 26 import org.thingsboard.server.common.msg.TbMsg;
27 27 import org.thingsboard.server.common.msg.TbMsgMetaData;
28 28 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
  29 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
29 30
30 31 import java.util.UUID;
31 32 import java.util.concurrent.TimeUnit;
... ... @@ -118,7 +119,7 @@ public class TbMsgGeneratorNode implements TbNode {
118 119 }
119 120 lastScheduledTs = lastScheduledTs + delay;
120 121 long curDelay = Math.max(0L, (lastScheduledTs - curTs));
121   - TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
  122 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
122 123 nextTickId = tickMsg.getId();
123 124 ctx.tellSelf(tickMsg, curDelay);
124 125 }
... ... @@ -126,13 +127,13 @@ public class TbMsgGeneratorNode implements TbNode {
126 127 private ListenableFuture<TbMsg> generate(TbContext ctx) {
127 128 return ctx.getJsExecutor().executeAsync(() -> {
128 129 if (prevMsg == null) {
129   - prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
  130 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, "", originatorId, new TbMsgMetaData(), "{}");
130 131 }
131 132 if (initialized) {
132 133 ctx.logJsEvalRequest();
133 134 TbMsg generated = jsEngine.executeGenerate(prevMsg);
134 135 ctx.logJsEvalResponse();
135   - prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
  136 + prevMsg = ctx.newMsg(ServiceQueue.MAIN, generated.getType(), originatorId, generated.getMetaData(), generated.getData());
136 137 }
137 138 return prevMsg;
138 139 });
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
26 26 import org.thingsboard.server.common.data.plugin.ComponentType;
27 27 import org.thingsboard.server.common.msg.TbMsg;
28 28 import org.thingsboard.server.common.msg.TbMsgMetaData;
  29 +import org.thingsboard.server.common.msg.queue.ServiceQueue;
29 30
30 31 import java.util.HashMap;
31 32 import java.util.Map;
... ... @@ -70,7 +71,7 @@ public class TbMsgDelayNode implements TbNode {
70 71 } else {
71 72 if (pendingMsgs.size() < config.getMaxPendingMsgs()) {
72 73 pendingMsgs.put(msg.getId(), msg);
73   - TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
  74 + TbMsg tickMsg = ctx.newMsg(ServiceQueue.MAIN, TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
74 75 ctx.tellSelf(tickMsg, getDelay(msg));
75 76 ctx.ack(msg);
76 77 } else {
... ...
... ... @@ -112,10 +112,10 @@ public class TbSendRPCRequestNode implements TbNode {
112 112
113 113 ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
114 114 if (!ruleEngineDeviceRpcResponse.getError().isPresent()) {
115   - TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
  115 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
116 116 ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
117 117 } else {
118   - TbMsg next = ctx.newMsg(msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
  118 + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
119 119 ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));
120 120 }
121 121 });
... ...