Commit 86647852169779d60a1f24d02cfbb25f1455f11d
1 parent
5d919df9
RuleChainActorMessageProcessor: onTellNext null pointer fix
Showing
1 changed file
with
12 additions
and
5 deletions
@@ -250,10 +250,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | @@ -250,10 +250,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | ||
250 | checkActive(msg); | 250 | checkActive(msg); |
251 | EntityId entityId = msg.getOriginator(); | 251 | EntityId entityId = msg.getOriginator(); |
252 | TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId); | 252 | TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId); |
253 | - List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream() | 253 | + |
254 | + List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId); | ||
255 | + if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore | ||
256 | + log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId()); | ||
257 | + ruleNodeRelations = Collections.emptyList(); | ||
258 | + } | ||
259 | + | ||
260 | + List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream() | ||
254 | .filter(r -> contains(relationTypes, r.getType())) | 261 | .filter(r -> contains(relationTypes, r.getType())) |
255 | .collect(Collectors.toList()); | 262 | .collect(Collectors.toList()); |
256 | - int relationsCount = relations.size(); | 263 | + int relationsCount = relationsByTypes.size(); |
257 | if (relationsCount == 0) { | 264 | if (relationsCount == 0) { |
258 | log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); | 265 | log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); |
259 | if (relationTypes.contains(TbRelationTypes.FAILURE)) { | 266 | if (relationTypes.contains(TbRelationTypes.FAILURE)) { |
@@ -268,14 +275,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | @@ -268,14 +275,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | ||
268 | msg.getCallback().onSuccess(); | 275 | msg.getCallback().onSuccess(); |
269 | } | 276 | } |
270 | } else if (relationsCount == 1) { | 277 | } else if (relationsCount == 1) { |
271 | - for (RuleNodeRelation relation : relations) { | 278 | + for (RuleNodeRelation relation : relationsByTypes) { |
272 | log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); | 279 | log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); |
273 | pushToTarget(tpi, msg, relation.getOut(), relation.getType()); | 280 | pushToTarget(tpi, msg, relation.getOut(), relation.getType()); |
274 | } | 281 | } |
275 | } else { | 282 | } else { |
276 | MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); | 283 | MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); |
277 | - log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations); | ||
278 | - for (RuleNodeRelation relation : relations) { | 284 | + log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes); |
285 | + for (RuleNodeRelation relation : relationsByTypes) { | ||
279 | EntityId target = relation.getOut(); | 286 | EntityId target = relation.getOut(); |
280 | putToQueue(tpi, msg, callbackWrapper, target); | 287 | putToQueue(tpi, msg, callbackWrapper, target); |
281 | } | 288 | } |