Commit 7e96edd6ac901ab1020f4fa86371fcbe0b3ba032

Authored by Igor Kulikov
1 parent bf14c3a9

Improve TbMsgGeneratorNode to be executed in singleton mode

@@ -57,6 +57,7 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -57,6 +57,7 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
57 import scala.concurrent.duration.Duration; 57 import scala.concurrent.duration.Duration;
58 58
59 import java.util.Collections; 59 import java.util.Collections;
  60 +import java.util.Optional;
60 import java.util.Set; 61 import java.util.Set;
61 import java.util.concurrent.TimeUnit; 62 import java.util.concurrent.TimeUnit;
62 import java.util.function.Consumer; 63 import java.util.function.Consumer;
@@ -102,6 +103,12 @@ class DefaultTbContext implements TbContext { @@ -102,6 +103,12 @@ class DefaultTbContext implements TbContext {
102 scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor()); 103 scheduleMsgWithDelay(new RuleNodeToSelfMsg(msg), delayMs, nodeCtx.getSelfActor());
103 } 104 }
104 105
  106 + @Override
  107 + public boolean isLocalEntity(EntityId entityId) {
  108 + Optional<ServerAddress> address = mainCtx.getRoutingService().resolveById(entityId);
  109 + return !address.isPresent();
  110 + }
  111 +
105 private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { 112 private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
106 mainCtx.getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, mainCtx.getActorSystem().dispatcher(), nodeCtx.getSelfActor()); 113 mainCtx.getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, mainCtx.getActorSystem().dispatcher(), nodeCtx.getSelfActor());
107 } 114 }
@@ -83,7 +83,9 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod @@ -83,7 +83,9 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
83 83
84 @Override 84 @Override
85 public void onClusterEventMsg(ClusterEventMsg msg) { 85 public void onClusterEventMsg(ClusterEventMsg msg) {
86 - 86 + if (tbNode != null) {
  87 + tbNode.onClusterEventMsg(defaultCtx, msg);
  88 + }
87 } 89 }
88 90
89 public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception { 91 public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
@@ -51,6 +51,8 @@ public interface TbContext { @@ -51,6 +51,8 @@ public interface TbContext {
51 51
52 void tellSelf(TbMsg msg, long delayMs); 52 void tellSelf(TbMsg msg, long delayMs);
53 53
  54 + boolean isLocalEntity(EntityId entityId);
  55 +
54 void tellFailure(TbMsg msg, Throwable th); 56 void tellFailure(TbMsg msg, Throwable th);
55 57
56 void updateSelf(RuleNode self); 58 void updateSelf(RuleNode self);
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.rule.engine.api; 16 package org.thingsboard.rule.engine.api;
17 17
18 import org.thingsboard.server.common.msg.TbMsg; 18 import org.thingsboard.server.common.msg.TbMsg;
  19 +import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
19 20
20 import java.util.concurrent.ExecutionException; 21 import java.util.concurrent.ExecutionException;
21 22
@@ -30,4 +31,6 @@ public interface TbNode { @@ -30,4 +31,6 @@ public interface TbNode {
30 31
31 void destroy(); 32 void destroy();
32 33
  34 + default void onClusterEventMsg(TbContext ctx, ClusterEventMsg msg) {}
  35 +
33 } 36 }
@@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
25 import org.thingsboard.server.common.data.plugin.ComponentType; 25 import org.thingsboard.server.common.data.plugin.ComponentType;
26 import org.thingsboard.server.common.msg.TbMsg; 26 import org.thingsboard.server.common.msg.TbMsg;
27 import org.thingsboard.server.common.msg.TbMsgMetaData; 27 import org.thingsboard.server.common.msg.TbMsgMetaData;
  28 +import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
28 29
29 import java.util.UUID; 30 import java.util.UUID;
30 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.TimeUnit;
@@ -56,6 +57,7 @@ public class TbMsgGeneratorNode implements TbNode { @@ -56,6 +57,7 @@ public class TbMsgGeneratorNode implements TbNode {
56 private EntityId originatorId; 57 private EntityId originatorId;
57 private UUID nextTickId; 58 private UUID nextTickId;
58 private TbMsg prevMsg; 59 private TbMsg prevMsg;
  60 + private volatile boolean initialized;
59 61
60 @Override 62 @Override
61 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { 63 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@@ -66,21 +68,42 @@ public class TbMsgGeneratorNode implements TbNode { @@ -66,21 +68,42 @@ public class TbMsgGeneratorNode implements TbNode {
66 } else { 68 } else {
67 originatorId = ctx.getSelfId(); 69 originatorId = ctx.getSelfId();
68 } 70 }
69 - this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");  
70 - scheduleTickMsg(ctx); 71 + updateGeneratorState(ctx);
  72 + }
  73 +
  74 + @Override
  75 + public void onClusterEventMsg(TbContext ctx, ClusterEventMsg msg) {
  76 + updateGeneratorState(ctx);
  77 + }
  78 +
  79 + private void updateGeneratorState(TbContext ctx) {
  80 + if (ctx.isLocalEntity(originatorId)) {
  81 + if (!initialized) {
  82 + initialized = true;
  83 + this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
  84 + scheduleTickMsg(ctx);
  85 + }
  86 + } else if (initialized) {
  87 + initialized = false;
  88 + destroy();
  89 + }
71 } 90 }
72 91
73 @Override 92 @Override
74 public void onMsg(TbContext ctx, TbMsg msg) { 93 public void onMsg(TbContext ctx, TbMsg msg) {
75 - if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { 94 + if (initialized && msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
76 withCallback(generate(ctx), 95 withCallback(generate(ctx),
77 m -> { 96 m -> {
78 - ctx.tellNext(m, SUCCESS);  
79 - scheduleTickMsg(ctx); 97 + if (initialized) {
  98 + ctx.tellNext(m, SUCCESS);
  99 + scheduleTickMsg(ctx);
  100 + }
80 }, 101 },
81 t -> { 102 t -> {
82 - ctx.tellFailure(msg, t);  
83 - scheduleTickMsg(ctx); 103 + if (initialized) {
  104 + ctx.tellFailure(msg, t);
  105 + scheduleTickMsg(ctx);
  106 + }
84 }); 107 });
85 } 108 }
86 } 109 }
@@ -102,8 +125,10 @@ public class TbMsgGeneratorNode implements TbNode { @@ -102,8 +125,10 @@ public class TbMsgGeneratorNode implements TbNode {
102 if (prevMsg == null) { 125 if (prevMsg == null) {
103 prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}"); 126 prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
104 } 127 }
105 - TbMsg generated = jsEngine.executeGenerate(prevMsg);  
106 - prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); 128 + if (initialized) {
  129 + TbMsg generated = jsEngine.executeGenerate(prevMsg);
  130 + prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
  131 + }
107 return prevMsg; 132 return prevMsg;
108 }); 133 });
109 } 134 }
@@ -113,6 +138,7 @@ public class TbMsgGeneratorNode implements TbNode { @@ -113,6 +138,7 @@ public class TbMsgGeneratorNode implements TbNode {
113 prevMsg = null; 138 prevMsg = null;
114 if (jsEngine != null) { 139 if (jsEngine != null) {
115 jsEngine.destroy(); 140 jsEngine.destroy();
  141 + jsEngine = null;
116 } 142 }
117 } 143 }
118 } 144 }