Commit 08ab9752fccb95fa8931e3247c252c8dc9d09f02
1 parent
d2919ba3
Better logging of Rule Engine errors
Showing
5 changed files
with
61 additions
and
29 deletions
... | ... | @@ -163,7 +163,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
163 | 163 | String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ? |
164 | 164 | DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME; |
165 | 165 | return context.actorOf( |
166 | - Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId())) | |
166 | + Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId())) | |
167 | 167 | .withDispatcher(dispatcherName), ruleNode.getId().toString()); |
168 | 168 | } |
169 | 169 | |
... | ... | @@ -200,7 +200,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
200 | 200 | log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg); |
201 | 201 | if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) { |
202 | 202 | try { |
203 | - checkActive(); | |
203 | + checkActive(envelope.getTbMsg()); | |
204 | 204 | RuleNodeId targetId = msg.getRuleNodeId(); |
205 | 205 | RuleNodeCtx targetCtx; |
206 | 206 | if (targetId == null) { |
... | ... | @@ -216,6 +216,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
216 | 216 | log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId); |
217 | 217 | msg.getCallback().onSuccess(); |
218 | 218 | } |
219 | + } catch (RuleNodeException rne) { | |
220 | + envelope.getTbMsg().getCallback().onFailure(rne); | |
219 | 221 | } catch (Exception e) { |
220 | 222 | envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage())); |
221 | 223 | } |
... | ... | @@ -225,11 +227,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
225 | 227 | } |
226 | 228 | |
227 | 229 | void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) { |
228 | - checkActive(); | |
229 | - if (firstNode != null) { | |
230 | - pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType()); | |
231 | - } else { | |
232 | - envelope.getMsg().getCallback().onSuccess(); | |
230 | + try { | |
231 | + checkActive(envelope.getMsg()); | |
232 | + if (firstNode != null) { | |
233 | + pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType()); | |
234 | + } else { | |
235 | + envelope.getMsg().getCallback().onSuccess(); | |
236 | + } | |
237 | + } catch (RuleNodeException e) { | |
238 | + log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId); | |
233 | 239 | } |
234 | 240 | } |
235 | 241 | |
... | ... | @@ -239,7 +245,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
239 | 245 | |
240 | 246 | private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) { |
241 | 247 | try { |
242 | - checkActive(); | |
248 | + checkActive(msg); | |
243 | 249 | EntityId entityId = msg.getOriginator(); |
244 | 250 | TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); |
245 | 251 | List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream() |
... | ... | @@ -272,6 +278,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
272 | 278 | putToQueue(tpi, msg, callbackWrapper, target); |
273 | 279 | } |
274 | 280 | } |
281 | + } catch (RuleNodeException rne) { | |
282 | + msg.getCallback().onFailure(rne); | |
275 | 283 | } catch (Exception e) { |
276 | 284 | msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage())); |
277 | 285 | } |
... | ... | @@ -333,4 +341,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
333 | 341 | } |
334 | 342 | } |
335 | 343 | |
344 | + @Override | |
345 | + protected RuleNodeException getInactiveException() { | |
346 | + RuleNode firstRuleNode = firstNode != null ? firstNode.getSelf() : null; | |
347 | + return new RuleNodeException("Rule Chain is not active! Failed to initialize.", ruleChainName, firstRuleNode); | |
348 | + } | |
336 | 349 | } | ... | ... |
... | ... | @@ -27,12 +27,14 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
27 | 27 | |
28 | 28 | public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> { |
29 | 29 | |
30 | + private final String ruleChainName; | |
30 | 31 | private final RuleChainId ruleChainId; |
31 | 32 | |
32 | - private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { | |
33 | + private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) { | |
33 | 34 | super(systemContext, tenantId, ruleNodeId); |
35 | + this.ruleChainName = ruleChainName; | |
34 | 36 | this.ruleChainId = ruleChainId; |
35 | - setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext, | |
37 | + setProcessor(new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, | |
36 | 38 | context().parent(), context().self())); |
37 | 39 | } |
38 | 40 | |
... | ... | @@ -96,19 +98,21 @@ public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessa |
96 | 98 | |
97 | 99 | private final TenantId tenantId; |
98 | 100 | private final RuleChainId ruleChainId; |
101 | + private final String ruleChainName; | |
99 | 102 | private final RuleNodeId ruleNodeId; |
100 | 103 | |
101 | - public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { | |
104 | + public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) { | |
102 | 105 | super(context); |
103 | 106 | this.tenantId = tenantId; |
104 | 107 | this.ruleChainId = ruleChainId; |
108 | + this.ruleChainName = ruleChainName; | |
105 | 109 | this.ruleNodeId = ruleNodeId; |
106 | 110 | |
107 | 111 | } |
108 | 112 | |
109 | 113 | @Override |
110 | 114 | public RuleNodeActor create() throws Exception { |
111 | - return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId); | |
115 | + return new RuleNodeActor(context, tenantId, ruleChainId, ruleChainName, ruleNodeId); | |
112 | 116 | } |
113 | 117 | } |
114 | 118 | ... | ... |
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java
... | ... | @@ -17,37 +17,33 @@ package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | 18 | import akka.actor.ActorContext; |
19 | 19 | import akka.actor.ActorRef; |
20 | -import org.thingsboard.rule.engine.api.TbContext; | |
21 | 20 | import org.thingsboard.rule.engine.api.TbNode; |
22 | 21 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
23 | 22 | import org.thingsboard.server.actors.ActorSystemContext; |
24 | 23 | import org.thingsboard.server.actors.shared.ComponentMsgProcessor; |
25 | -import org.thingsboard.server.common.data.id.RuleChainId; | |
26 | 24 | import org.thingsboard.server.common.data.id.RuleNodeId; |
27 | 25 | import org.thingsboard.server.common.data.id.TenantId; |
28 | 26 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
29 | 27 | import org.thingsboard.server.common.data.rule.RuleNode; |
30 | 28 | import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
31 | -import org.thingsboard.server.dao.rule.RuleChainService; | |
29 | +import org.thingsboard.server.common.msg.queue.RuleNodeException; | |
32 | 30 | |
33 | 31 | /** |
34 | 32 | * @author Andrew Shvayka |
35 | 33 | */ |
36 | 34 | public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> { |
37 | 35 | |
38 | - private final ActorRef parent; | |
36 | + private final String ruleChainName; | |
39 | 37 | private final ActorRef self; |
40 | - private final RuleChainService service; | |
41 | 38 | private RuleNode ruleNode; |
42 | 39 | private TbNode tbNode; |
43 | 40 | private DefaultTbContext defaultCtx; |
44 | 41 | |
45 | - RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext | |
42 | + RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext | |
46 | 43 | , ActorRef parent, ActorRef self) { |
47 | 44 | super(systemContext, tenantId, ruleNodeId); |
48 | - this.parent = parent; | |
45 | + this.ruleChainName = ruleChainName; | |
49 | 46 | this.self = self; |
50 | - this.service = systemContext.getRuleChainService(); | |
51 | 47 | this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId); |
52 | 48 | this.defaultCtx = new DefaultTbContext(systemContext, new RuleNodeCtx(tenantId, parent, self, ruleNode)); |
53 | 49 | } |
... | ... | @@ -63,8 +59,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
63 | 59 | @Override |
64 | 60 | public void onUpdate(ActorContext context) throws Exception { |
65 | 61 | RuleNode newRuleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId); |
66 | - boolean restartRequired = !(ruleNode.getType().equals(newRuleNode.getType()) | |
67 | - && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration())); | |
62 | + boolean restartRequired = state != ComponentLifecycleState.ACTIVE || | |
63 | + !(ruleNode.getType().equals(newRuleNode.getType()) && ruleNode.getConfiguration().equals(newRuleNode.getConfiguration())); | |
68 | 64 | this.ruleNode = newRuleNode; |
69 | 65 | this.defaultCtx.updateSelf(newRuleNode); |
70 | 66 | if (restartRequired) { |
... | ... | @@ -91,7 +87,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
91 | 87 | } |
92 | 88 | |
93 | 89 | public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception { |
94 | - checkActive(); | |
90 | + checkActive(msg.getMsg()); | |
95 | 91 | if (ruleNode.isDebugMode()) { |
96 | 92 | systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self"); |
97 | 93 | } |
... | ... | @@ -103,7 +99,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
103 | 99 | } |
104 | 100 | |
105 | 101 | void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { |
106 | - checkActive(); | |
102 | + checkActive(msg.getMsg()); | |
107 | 103 | if (ruleNode.isDebugMode()) { |
108 | 104 | systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType()); |
109 | 105 | } |
... | ... | @@ -129,4 +125,8 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod |
129 | 125 | return tbNode; |
130 | 126 | } |
131 | 127 | |
128 | + @Override | |
129 | + protected RuleNodeException getInactiveException() { | |
130 | + return new RuleNodeException("Rule Node is not active! Failed to initialize.", ruleChainName, ruleNode); | |
131 | + } | |
132 | 132 | } | ... | ... |
... | ... | @@ -22,7 +22,10 @@ import org.thingsboard.server.actors.stats.StatsPersistTick; |
22 | 22 | import org.thingsboard.server.common.data.id.EntityId; |
23 | 23 | import org.thingsboard.server.common.data.id.TenantId; |
24 | 24 | import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; |
25 | +import org.thingsboard.server.common.msg.TbMsg; | |
25 | 26 | import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
27 | +import org.thingsboard.server.common.msg.queue.RuleEngineException; | |
28 | +import org.thingsboard.server.common.msg.queue.RuleNodeException; | |
26 | 29 | |
27 | 30 | @Slf4j |
28 | 31 | public abstract class ComponentMsgProcessor<T extends EntityId> extends AbstractContextAwareMsgProcessor { |
... | ... | @@ -74,11 +77,17 @@ public abstract class ComponentMsgProcessor<T extends EntityId> extends Abstract |
74 | 77 | schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency); |
75 | 78 | } |
76 | 79 | |
77 | - protected void checkActive() { | |
80 | + protected void checkActive(TbMsg tbMsg) throws RuleNodeException { | |
78 | 81 | if (state != ComponentLifecycleState.ACTIVE) { |
79 | 82 | log.debug("Component is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId); |
80 | - throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId); | |
83 | + RuleNodeException ruleNodeException = getInactiveException(); | |
84 | + if (tbMsg != null) { | |
85 | + tbMsg.getCallback().onFailure(ruleNodeException); | |
86 | + } | |
87 | + throw ruleNodeException; | |
81 | 88 | } |
82 | 89 | } |
83 | 90 | |
91 | + abstract protected RuleNodeException getInactiveException(); | |
92 | + | |
84 | 93 | } | ... | ... |
... | ... | @@ -36,9 +36,15 @@ public class RuleNodeException extends RuleEngineException { |
36 | 36 | public RuleNodeException(String message, String ruleChainName, RuleNode ruleNode) { |
37 | 37 | super(message); |
38 | 38 | this.ruleChainName = ruleChainName; |
39 | - this.ruleNodeName = ruleNode.getName(); | |
40 | - this.ruleChainId = ruleNode.getRuleChainId(); | |
41 | - this.ruleNodeId = ruleNode.getId(); | |
39 | + if (ruleNode != null) { | |
40 | + this.ruleNodeName = ruleNode.getName(); | |
41 | + this.ruleChainId = ruleNode.getRuleChainId(); | |
42 | + this.ruleNodeId = ruleNode.getId(); | |
43 | + } else { | |
44 | + ruleNodeName = "Unknown"; | |
45 | + ruleChainId = new RuleChainId(RuleChainId.NULL_UUID); | |
46 | + ruleNodeId = new RuleNodeId(RuleNodeId.NULL_UUID); | |
47 | + } | |
42 | 48 | } |
43 | 49 | |
44 | 50 | public String toJsonString() { | ... | ... |