Commit 3ee3839c3c00cd18bf729515530950235fb8edab

Authored by Andrii Shvaika
1 parent 78ac8761

Protection from infinite loops

... ... @@ -334,7 +334,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
334 334
335 335 private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
336 336 if (nodeCtx != null) {
337   - apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
338 337 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType));
339 338 } else {
340 339 log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
... ...
... ... @@ -22,10 +22,13 @@ import org.thingsboard.server.actors.TbActorCtx;
22 22 import org.thingsboard.server.actors.TbActorRef;
23 23 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
24 24 import org.thingsboard.server.common.data.ApiUsageRecordKey;
  25 +import org.thingsboard.server.common.data.TenantProfile;
25 26 import org.thingsboard.server.common.data.id.RuleNodeId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
28 29 import org.thingsboard.server.common.data.rule.RuleNode;
  30 +import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
  31 +import org.thingsboard.server.common.msg.TbMsg;
29 32 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
30 33 import org.thingsboard.server.common.msg.queue.RuleNodeException;
31 34 import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
... ... @@ -96,27 +99,42 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
96 99
97 100 public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
98 101 checkActive(msg.getMsg());
99   - apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
100   - if (ruleNode.isDebugMode()) {
101   - systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
102   - }
103   - try {
104   - tbNode.onMsg(defaultCtx, msg.getMsg());
105   - } catch (Exception e) {
106   - defaultCtx.tellFailure(msg.getMsg(), e);
  102 + TbMsg tbMsg = msg.getMsg();
  103 + int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
  104 + int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
  105 + if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
  106 + apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
  107 + if (ruleNode.isDebugMode()) {
  108 + systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
  109 + }
  110 + try {
  111 + tbNode.onMsg(defaultCtx, msg.getMsg());
  112 + } catch (Exception e) {
  113 + defaultCtx.tellFailure(msg.getMsg(), e);
  114 + }
  115 + } else {
  116 + tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
107 117 }
108 118 }
109 119
110 120 void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
111 121 msg.getMsg().getCallback().onProcessingStart(info);
112 122 checkActive(msg.getMsg());
113   - if (ruleNode.isDebugMode()) {
114   - systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
115   - }
116   - try {
117   - tbNode.onMsg(msg.getCtx(), msg.getMsg());
118   - } catch (Exception e) {
119   - msg.getCtx().tellFailure(msg.getMsg(), e);
  123 + TbMsg tbMsg = msg.getMsg();
  124 + int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
  125 + int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
  126 + if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
  127 + apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
  128 + if (ruleNode.isDebugMode()) {
  129 + systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
  130 + }
  131 + try {
  132 + tbNode.onMsg(msg.getCtx(), msg.getMsg());
  133 + } catch (Exception e) {
  134 + msg.getCtx().tellFailure(msg.getMsg(), e);
  135 + }
  136 + } else {
  137 + tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
120 138 }
121 139 }
122 140
... ...
... ... @@ -19,9 +19,11 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.actors.ActorSystemContext;
20 20 import org.thingsboard.server.actors.TbActorCtx;
21 21 import org.thingsboard.server.actors.stats.StatsPersistTick;
  22 +import org.thingsboard.server.common.data.TenantProfile;
22 23 import org.thingsboard.server.common.data.id.EntityId;
23 24 import org.thingsboard.server.common.data.id.TenantId;
24 25 import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
  26 +import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
25 27 import org.thingsboard.server.common.msg.TbMsg;
26 28 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
27 29 import org.thingsboard.server.common.msg.queue.RuleNodeException;
... ... @@ -39,6 +41,10 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract
39 41 this.entityId = id;
40 42 }
41 43
  44 + protected TenantProfileConfiguration getTenantProfileConfiguration() {
  45 + return systemContext.getTenantProfileCache().get(tenantId).getProfileData().getConfiguration();
  46 + }
  47 +
42 48 public abstract String getComponentName();
43 49
44 50 public abstract void start(TbActorCtx context) throws Exception;
... ...
... ... @@ -61,4 +61,9 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
61 61 public TenantProfileType getType() {
62 62 return TenantProfileType.DEFAULT;
63 63 }
  64 +
  65 + @Override
  66 + public int getMaxRuleNodeExecsPerMessage() {
  67 + return maxRuleNodeExecutionsPerMessage;
  68 + }
64 69 }
... ...
... ... @@ -37,4 +37,7 @@ public interface TenantProfileConfiguration {
37 37 @JsonIgnore
38 38 long getProfileThreshold(ApiUsageRecordKey key);
39 39
  40 + @JsonIgnore
  41 + int getMaxRuleNodeExecsPerMessage();
  42 +
40 43 }
... ...
... ... @@ -15,14 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.common.data.tenant.profile;
17 17
18   -import com.fasterxml.jackson.annotation.JsonAnyGetter;
19   -import com.fasterxml.jackson.annotation.JsonAnySetter;
20   -import com.fasterxml.jackson.annotation.JsonIgnore;
21 18 import lombok.Data;
22 19
23   -import java.util.HashMap;
24   -import java.util.Map;
25   -
26 20 @Data
27 21 public class TenantProfileData {
28 22
... ...
... ... @@ -18,27 +18,27 @@ package org.thingsboard.server.common.msg;
18 18 import com.fasterxml.jackson.annotation.JsonIgnore;
19 19 import com.google.protobuf.ByteString;
20 20 import com.google.protobuf.InvalidProtocolBufferException;
  21 +import lombok.AccessLevel;
21 22 import lombok.Builder;
22 23 import lombok.Data;
  24 +import lombok.Getter;
23 25 import lombok.extern.slf4j.Slf4j;
24 26 import org.thingsboard.server.common.data.id.EntityId;
25 27 import org.thingsboard.server.common.data.id.EntityIdFactory;
26 28 import org.thingsboard.server.common.data.id.RuleChainId;
27 29 import org.thingsboard.server.common.data.id.RuleNodeId;
28 30 import org.thingsboard.server.common.msg.gen.MsgProtos;
29   -import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
30 31 import org.thingsboard.server.common.msg.queue.ServiceQueue;
31 32 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
32 33
33   -import java.io.IOException;
34 34 import java.io.Serializable;
35 35 import java.util.UUID;
  36 +import java.util.concurrent.atomic.AtomicInteger;
36 37
37 38 /**
38 39 * Created by ashvayka on 13.01.18.
39 40 */
40 41 @Data
41   -@Builder
42 42 @Slf4j
43 43 public final class TbMsg implements Serializable {
44 44
... ... @@ -52,51 +52,63 @@ public final class TbMsg implements Serializable {
52 52 private final String data;
53 53 private final RuleChainId ruleChainId;
54 54 private final RuleNodeId ruleNodeId;
  55 + @Getter(value = AccessLevel.NONE)
  56 + private final AtomicInteger ruleNodeExecCounter;
  57 +
  58 + public int getAndIncrementRuleNodeCounter() {
  59 + return ruleNodeExecCounter.getAndIncrement();
  60 + }
  61 +
55 62 //This field is not serialized because we use queues and there is no need to do it
56 63 @JsonIgnore
57 64 transient private final TbMsgCallback callback;
58 65
59 66 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
60   - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  67 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator,
  68 + metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY);
61 69 }
62 70
63 71 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data) {
64   - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
  72 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
65 73 }
66 74
  75 + // REALLY NEW MSG
  76 +
67 77 public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
68   - return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, TbMsgCallback.EMPTY);
  78 + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, TbMsgCallback.EMPTY);
69 79 }
70 80
71 81 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data) {
72   - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, TbMsgCallback.EMPTY);
  82 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, null, null, 0, TbMsgCallback.EMPTY);
73 83 }
74 84
  85 + // For Tests only
  86 +
75 87 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
76   - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  88 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, 0, TbMsgCallback.EMPTY);
77 89 }
78 90
79 91 public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) {
80   - return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback);
  92 + return new TbMsg(ServiceQueue.MAIN, UUID.randomUUID(), System.currentTimeMillis(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, 0, callback);
81 93 }
82 94
83   - public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
84   - return new TbMsg(origMsg.getQueueName(), origMsg.getId(), origMsg.getTs(), type, originator, metaData.copy(), origMsg.getDataType(),
85   - data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
  95 + public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
  96 + return new TbMsg(tbMsg.getQueueName(), tbMsg.getId(), tbMsg.getTs(), type, originator, metaData.copy(), tbMsg.getDataType(),
  97 + data, tbMsg.getRuleChainId(), tbMsg.getRuleNodeId(), tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
86 98 }
87 99
88   - public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId) {
89   - return new TbMsg(origMsg.queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType,
90   - origMsg.data, ruleChainId, null, origMsg.getCallback());
  100 + public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) {
  101 + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.metaData, tbMsg.dataType,
  102 + tbMsg.data, ruleChainId, null, tbMsg.ruleNodeExecCounter.get(), tbMsg.getCallback());
91 103 }
92 104
93 105 public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
94 106 return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
95   - tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
  107 + tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ruleNodeExecCounter.get(), TbMsgCallback.EMPTY);
96 108 }
97 109
98 110 private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
99   - RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgCallback callback) {
  111 + RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) {
100 112 this.id = id;
101 113 this.queueName = queueName;
102 114 if (ts > 0) {
... ... @@ -111,6 +123,7 @@ public final class TbMsg implements Serializable {
111 123 this.data = data;
112 124 this.ruleChainId = ruleChainId;
113 125 this.ruleNodeId = ruleNodeId;
  126 + this.ruleNodeExecCounter = new AtomicInteger(ruleNodeExecCounter);
114 127 if (callback != null) {
115 128 this.callback = callback;
116 129 } else {
... ... @@ -147,6 +160,7 @@ public final class TbMsg implements Serializable {
147 160
148 161 builder.setDataType(msg.getDataType().ordinal());
149 162 builder.setData(msg.getData());
  163 + builder.setRuleNodeExecCounter(msg.ruleNodeExecCounter.get());
150 164 return builder.build().toByteArray();
151 165 }
152 166
... ... @@ -164,18 +178,18 @@ public final class TbMsg implements Serializable {
164 178 ruleNodeId = new RuleNodeId(new UUID(proto.getRuleNodeIdMSB(), proto.getRuleNodeIdLSB()));
165 179 }
166 180 TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
167   - return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, callback);
  181 + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, proto.getRuleNodeExecCounter(), callback);
168 182 } catch (InvalidProtocolBufferException e) {
169 183 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
170 184 }
171 185 }
172 186
173 187 public TbMsg copyWithRuleChainId(RuleChainId ruleChainId) {
174   - return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, callback);
  188 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, null, this.ruleNodeExecCounter.get(), callback);
175 189 }
176 190
177 191 public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
178   - return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, callback);
  192 + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ruleNodeExecCounter.get(), callback);
179 193 }
180 194
181 195 public TbMsgCallback getCallback() {
... ...
... ... @@ -45,4 +45,5 @@ message TbMsgProto {
45 45 string data = 14;
46 46
47 47 int64 ts = 15;
  48 + int32 ruleNodeExecCounter = 16;
48 49 }
\ No newline at end of file
... ...