Commit 52ffd5345cae984c8f91de1c94ed10ffa9da415e

Authored by Igor Kulikov
1 parent 5faf9df1

Refactoring of Generator Node

@@ -248,7 +248,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -248,7 +248,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
248 int relationsCount = relations.size(); 248 int relationsCount = relations.size();
249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); 249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
250 if (relationsCount == 0) { 250 if (relationsCount == 0) {
251 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 251 + if (ackId != null) {
  252 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  253 + }
252 } else if (relationsCount == 1) { 254 } else if (relationsCount == 1) {
253 for (RuleNodeRelation relation : relations) { 255 for (RuleNodeRelation relation : relations) {
254 pushToTarget(msg, relation.getOut(), relation.getType()); 256 pushToTarget(msg, relation.getOut(), relation.getType());
@@ -266,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -266,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
266 } 268 }
267 } 269 }
268 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues. 270 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
269 - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition()); 271 + if (ackId != null) {
  272 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  273 + }
270 } 274 }
271 } 275 }
272 276
@@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
47 47
48 public class TbMsgGeneratorNode implements TbNode { 48 public class TbMsgGeneratorNode implements TbNode {
49 49
50 - public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg"; 50 + private static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
51 51
52 private TbMsgGeneratorNodeConfiguration config; 52 private TbMsgGeneratorNodeConfiguration config;
53 private ScriptEngine jsEngine; 53 private ScriptEngine jsEngine;
54 private long delay; 54 private long delay;
  55 + private long lastScheduledTs;
55 private EntityId originatorId; 56 private EntityId originatorId;
56 private UUID nextTickId; 57 private UUID nextTickId;
57 private TbMsg prevMsg; 58 private TbMsg prevMsg;
@@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode { @@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode {
66 originatorId = ctx.getSelfId(); 67 originatorId = ctx.getSelfId();
67 } 68 }
68 this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType"); 69 this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
69 - sentTickMsg(ctx); 70 + scheduleTickMsg(ctx);
70 } 71 }
71 72
72 @Override 73 @Override
73 public void onMsg(TbContext ctx, TbMsg msg) { 74 public void onMsg(TbContext ctx, TbMsg msg) {
74 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { 75 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
75 withCallback(generate(ctx), 76 withCallback(generate(ctx),
76 - m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},  
77 - t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);}); 77 + m -> {
  78 + ctx.tellNext(m, SUCCESS);
  79 + scheduleTickMsg(ctx);
  80 + },
  81 + t -> {
  82 + ctx.tellFailure(msg, t);
  83 + scheduleTickMsg(ctx);
  84 + });
78 } 85 }
79 } 86 }
80 87
81 - private void sentTickMsg(TbContext ctx) { 88 + private void scheduleTickMsg(TbContext ctx) {
  89 + long curTs = System.currentTimeMillis();
  90 + if (lastScheduledTs == 0L) {
  91 + lastScheduledTs = curTs;
  92 + }
  93 + lastScheduledTs = lastScheduledTs + delay;
  94 + long curDelay = Math.max(0L, (lastScheduledTs - curTs));
82 TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), ""); 95 TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
83 nextTickId = tickMsg.getId(); 96 nextTickId = tickMsg.getId();
84 - ctx.tellSelf(tickMsg, delay); 97 + ctx.tellSelf(tickMsg, curDelay);
85 } 98 }
86 99
87 private ListenableFuture<TbMsg> generate(TbContext ctx) { 100 private ListenableFuture<TbMsg> generate(TbContext ctx) {
88 return ctx.getJsExecutor().executeAsync(() -> { 101 return ctx.getJsExecutor().executeAsync(() -> {
89 if (prevMsg == null) { 102 if (prevMsg == null) {
90 - prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}"); 103 + prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
91 } 104 }
92 TbMsg generated = jsEngine.executeGenerate(prevMsg); 105 TbMsg generated = jsEngine.executeGenerate(prevMsg);
93 prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); 106 prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());