Commit 8f53d4a255b097c157cff9d410c9b1cea7bc0ddc
Committed by
GitHub
Merge pull request #5181 from smatvienko-tb/RuleChainActorMessageProcessor-onTel…
…lNext-NullPointerException-fix Rule chain actor message processor on tell next NullPointerException fix
Showing
1 changed file
with
13 additions
and
5 deletions
... | ... | @@ -250,10 +250,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
250 | 250 | checkActive(msg); |
251 | 251 | EntityId entityId = msg.getOriginator(); |
252 | 252 | TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId); |
253 | - List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream() | |
253 | + | |
254 | + List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId); | |
255 | + if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore | |
256 | + log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId()); | |
257 | + ruleNodeRelations = Collections.emptyList(); | |
258 | + } | |
259 | + | |
260 | + List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream() | |
254 | 261 | .filter(r -> contains(relationTypes, r.getType())) |
255 | 262 | .collect(Collectors.toList()); |
256 | - int relationsCount = relations.size(); | |
263 | + int relationsCount = relationsByTypes.size(); | |
257 | 264 | if (relationsCount == 0) { |
258 | 265 | log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); |
259 | 266 | if (relationTypes.contains(TbRelationTypes.FAILURE)) { |
... | ... | @@ -268,14 +275,14 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
268 | 275 | msg.getCallback().onSuccess(); |
269 | 276 | } |
270 | 277 | } else if (relationsCount == 1) { |
271 | - for (RuleNodeRelation relation : relations) { | |
278 | + for (RuleNodeRelation relation : relationsByTypes) { | |
272 | 279 | log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); |
273 | 280 | pushToTarget(tpi, msg, relation.getOut(), relation.getType()); |
274 | 281 | } |
275 | 282 | } else { |
276 | 283 | MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); |
277 | - log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations); | |
278 | - for (RuleNodeRelation relation : relations) { | |
284 | + log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes); | |
285 | + for (RuleNodeRelation relation : relationsByTypes) { | |
279 | 286 | EntityId target = relation.getOut(); |
280 | 287 | putToQueue(tpi, msg, callbackWrapper, target); |
281 | 288 | } |
... | ... | @@ -283,6 +290,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
283 | 290 | } catch (RuleNodeException rne) { |
284 | 291 | msg.getCallback().onFailure(rne); |
285 | 292 | } catch (Exception e) { |
293 | + log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e); | |
286 | 294 | msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage())); |
287 | 295 | } |
288 | 296 | } | ... | ... |