|
@@ -174,15 +174,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
@@ -174,15 +174,16 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
174
|
context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
|
174
|
context.parent().tell(new RuleToPluginMsgWrapper(pluginTenantId, pluginId, tenantId, entityId, ruleToPluginMsg), context.self());
|
175
|
if (action.isOneWayAction()) {
|
175
|
if (action.isOneWayAction()) {
|
176
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
|
176
|
pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
|
|
|
177
|
+ return;
|
177
|
} else {
|
178
|
} else {
|
178
|
pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
|
179
|
pendingMsgMap.put(ruleToPluginMsg.getUid(), msg);
|
179
|
scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
|
180
|
scheduleMsgWithDelay(context, new RuleToPluginTimeoutMsg(ruleToPluginMsg.getUid()), systemContext.getPluginProcessingTimeout());
|
|
|
181
|
+ return;
|
180
|
}
|
182
|
}
|
181
|
}
|
183
|
}
|
182
|
- } else {
|
|
|
183
|
- logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
|
|
|
184
|
- pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
|
|
|
185
|
}
|
184
|
}
|
|
|
185
|
+ logger.debug("[{}] Nothing to send to plugin: {}", entityId, pluginId);
|
|
|
186
|
+ pushToNextRule(context, msg.getCtx(), RuleEngineError.NO_TWO_WAY_ACTIONS);
|
186
|
}
|
187
|
}
|
187
|
|
188
|
|
188
|
void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
|
189
|
void onPluginMsg(ActorContext context, PluginToRuleMsg<?> msg) {
|
|
@@ -215,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
|
@@ -215,13 +216,13 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> { |
215
|
ctx = ctx.withError(error);
|
216
|
ctx = ctx.withError(error);
|
216
|
}
|
217
|
}
|
217
|
if (ctx.isFailure()) {
|
218
|
if (ctx.isFailure()) {
|
218
|
- logger.debug("[{}] Forwarding processing chain to device actor due to failure.", ctx.getInMsg().getDeviceId());
|
219
|
+ logger.debug("[{}][{}] Forwarding processing chain to device actor due to failure.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
|
219
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
|
220
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
|
220
|
} else if (!ctx.hasNext()) {
|
221
|
} else if (!ctx.hasNext()) {
|
221
|
- logger.debug("[{}] Forwarding processing chain to device actor due to end of chain.", ctx.getInMsg().getDeviceId());
|
222
|
+ logger.debug("[{}][{}] Forwarding processing chain to device actor due to end of chain.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
|
222
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
|
223
|
ctx.getDeviceActor().tell(new RulesProcessedMsg(ctx), ActorRef.noSender());
|
223
|
} else {
|
224
|
} else {
|
224
|
- logger.debug("[{}] Forwarding processing chain to next rule actor.", ctx.getInMsg().getDeviceId());
|
225
|
+ logger.debug("[{}][{}] Forwarding processing chain to next rule actor.", ruleMd.getId(), ctx.getInMsg().getDeviceId());
|
225
|
ChainProcessingContext nextTask = ctx.getNext();
|
226
|
ChainProcessingContext nextTask = ctx.getNext();
|
226
|
nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
|
227
|
nextTask.getCurrentActor().tell(new RuleProcessingMsg(nextTask), context.self());
|
227
|
}
|
228
|
}
|