Commit 8acc9409331452df5656d2263746c60b3c9cadfb

Authored by Volodymyr Babak
2 parents ed7623af 52ffd534

Merge branch 'master' of github.com:thingsboard/thingsboard

... ... @@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor {
111 111 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
112 112 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
113 113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  114 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
114 115 onToDeviceActorMsg((TenantAwareMsg) msg);
115 116 break;
116 117 case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
... ...
... ... @@ -29,11 +29,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
29 29 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
30 30 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
31 31
32   -import java.util.HashMap;
33   -import java.util.LinkedList;
34   -import java.util.Map;
35   -import java.util.Queue;
36   -import java.util.UUID;
  32 +import java.util.*;
37 33
38 34 /**
39 35 * @author Andrew Shvayka
... ... @@ -88,7 +84,17 @@ public class RpcManagerActor extends ContextAwareActor {
88 84
89 85 private void onMsg(RpcBroadcastMsg msg) {
90 86 log.debug("Forwarding msg to session actors {}", msg);
91   - sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
  87 + sessionActors.keySet().forEach(address -> {
  88 + ClusterAPIProtos.ClusterMessage msgWithServerAddress = msg.getMsg()
  89 + .toBuilder()
  90 + .setServerAddress(ClusterAPIProtos.ServerAddress
  91 + .newBuilder()
  92 + .setHost(address.getHost())
  93 + .setPort(address.getPort())
  94 + .build())
  95 + .build();
  96 + onMsg(msgWithServerAddress);
  97 + });
92 98 pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
93 99 }
94 100
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.RuleChainId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.msg.MsgType;
  22 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
  23 +import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  24 +
  25 +/**
  26 + * Created by ashvayka on 19.03.18.
  27 + */
  28 +@Data
  29 +final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
  30 +
  31 + private final TenantId tenantId;
  32 + private final RuleChainId ruleChainId;
  33 +
  34 + public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
  35 + super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
  36 + this.tenantId = tenantId;
  37 + this.ruleChainId = ruleChainId;
  38 + }
  39 +
  40 + @Override
  41 + public MsgType getMsgType() {
  42 + return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
  43 + }
  44 +
  45 +}
... ...
... ... @@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
49 49 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
50 50 break;
51 51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  52 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
52 53 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
53 54 break;
54 55 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
... ...
... ... @@ -20,6 +20,9 @@ import akka.actor.ActorRef;
20 20 import akka.actor.Props;
21 21 import akka.event.LoggingAdapter;
22 22 import com.datastax.driver.core.utils.UUIDs;
  23 +
  24 +import java.util.Optional;
  25 +
23 26 import org.thingsboard.server.actors.ActorSystemContext;
24 27 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
25 28 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
... ... @@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
37 40 import org.thingsboard.server.common.data.rule.RuleNode;
38 41 import org.thingsboard.server.common.msg.TbMsg;
39 42 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  43 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
40 44 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
41 45 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
42 46 import org.thingsboard.server.dao.rule.RuleChainService;
... ... @@ -217,16 +221,36 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
217 221
218 222 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
219 223 checkActive();
220   - RuleNodeId originator = envelope.getOriginator();
221   - List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()
222   - .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
223   - .collect(Collectors.toList());
  224 + TbMsg msg = envelope.getMsg();
  225 + EntityId originatorEntityId = msg.getOriginator();
  226 + Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(originatorEntityId);
  227 +
  228 + if (address.isPresent()) {
  229 + onRemoteTellNext(address.get(), envelope);
  230 + } else {
  231 + onLocalTellNext(envelope);
  232 + }
  233 + }
  234 +
  235 + private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
  236 + TbMsg msg = envelope.getMsg();
  237 + logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
  238 + envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
  239 + systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
  240 + }
224 241
  242 + private void onLocalTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
225 243 TbMsg msg = envelope.getMsg();
  244 + RuleNodeId originatorNodeId = envelope.getOriginator();
  245 + List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
  246 + .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
  247 + .collect(Collectors.toList());
226 248 int relationsCount = relations.size();
227 249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
228 250 if (relationsCount == 0) {
229   - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  251 + if (ackId != null) {
  252 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  253 + }
230 254 } else if (relationsCount == 1) {
231 255 for (RuleNodeRelation relation : relations) {
232 256 pushToTarget(msg, relation.getOut(), relation.getType());
... ... @@ -244,7 +268,9 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
244 268 }
245 269 }
246 270 //TODO: Ideally this should happen in async way when all targets confirm that the copied messages are successfully written to corresponding target queues.
247   - queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  271 + if (ackId != null) {
  272 + queue.ack(tenantId, msg, ackId.getId(), msg.getClusterPartition());
  273 + }
248 274 }
249 275 }
250 276
... ...
... ... @@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId;
20 20 import org.thingsboard.server.common.msg.MsgType;
21 21 import org.thingsboard.server.common.msg.TbActorMsg;
22 22 import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
23 24
24 25 /**
25 26 * Created by ashvayka on 19.03.18.
26 27 */
27 28 @Data
28   -public final class RuleChainToRuleChainMsg implements TbActorMsg {
  29 +public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
29 30
30 31 private final RuleChainId target;
31 32 private final RuleChainId source;
... ... @@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
34 35 private final boolean enqueue;
35 36
36 37 @Override
  38 + public RuleChainId getRuleChainId() {
  39 + return target;
  40 + }
  41 +
  42 + @Override
37 43 public MsgType getMsgType() {
38 44 return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
39 45 }
... ...
... ... @@ -27,7 +27,7 @@ import java.util.Set;
27 27 * Created by ashvayka on 19.03.18.
28 28 */
29 29 @Data
30   -final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
  30 +class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
31 31
32 32 private final RuleNodeId originator;
33 33 private final Set<String> relationTypes;
... ...
... ... @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
35 35 import org.thingsboard.server.common.data.rule.RuleChain;
36 36 import org.thingsboard.server.common.msg.TbActorMsg;
37 37 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
  38 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
38 39 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
39 40 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
40 41 import scala.concurrent.duration.Duration;
... ... @@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor {
94 95 onToDeviceActorMsg((DeviceAwareMsg) msg);
95 96 break;
96 97 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
97   - onRuleChainMsg((RuleChainToRuleChainMsg) msg);
  98 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  99 + onRuleChainMsg((RuleChainAwareMsg) msg);
98 100 break;
99 101 default:
100 102 return false;
... ... @@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor {
120 122 else logger.info("[{}] No Root Chain", msg);
121 123 }
122 124
123   - private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {
124   - ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self());
  125 + private void onRuleChainMsg(RuleChainAwareMsg msg) {
  126 + ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
125 127 }
126 128
127 129
... ...
... ... @@ -63,6 +63,11 @@ public enum MsgType {
63 63 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
64 64
65 65 /**
  66 + * Message forwarded from original rule chain to remote rule chain due to change in the cluster structure or originator entity of the TbMsg.
  67 + */
  68 + REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
  69 +
  70 + /**
66 71 * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
67 72 */
68 73 RULE_TO_SELF_ERROR_MSG,
... ... @@ -101,6 +106,10 @@ public enum MsgType {
101 106 /**
102 107 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
103 108 */
104   - RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG;
  109 + RULE_ENGINE_QUEUE_PUT_ACK_MSG,
  110 + ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
  111 + TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
  112 + SESSION_TIMEOUT_MSG,
  113 + SESSION_CTRL_MSG;
105 114
106 115 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.aware;
  17 +
  18 +import org.thingsboard.server.common.data.id.RuleChainId;
  19 +
  20 +public interface RuleChainAwareMsg {
  21 +
  22 + RuleChainId getRuleChainId();
  23 +
  24 +}
... ...
... ... @@ -47,11 +47,12 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
47 47
48 48 public class TbMsgGeneratorNode implements TbNode {
49 49
50   - public static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
  50 + private static final String TB_MSG_GENERATOR_NODE_MSG = "TbMsgGeneratorNodeMsg";
51 51
52 52 private TbMsgGeneratorNodeConfiguration config;
53 53 private ScriptEngine jsEngine;
54 54 private long delay;
  55 + private long lastScheduledTs;
55 56 private EntityId originatorId;
56 57 private UUID nextTickId;
57 58 private TbMsg prevMsg;
... ... @@ -66,28 +67,40 @@ public class TbMsgGeneratorNode implements TbNode {
66 67 originatorId = ctx.getSelfId();
67 68 }
68 69 this.jsEngine = ctx.createJsScriptEngine(config.getJsScript(), "prevMsg", "prevMetadata", "prevMsgType");
69   - sentTickMsg(ctx);
  70 + scheduleTickMsg(ctx);
70 71 }
71 72
72 73 @Override
73 74 public void onMsg(TbContext ctx, TbMsg msg) {
74 75 if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) {
75 76 withCallback(generate(ctx),
76   - m -> {ctx.tellNext(m, SUCCESS); sentTickMsg(ctx);},
77   - t -> {ctx.tellFailure(msg, t); sentTickMsg(ctx);});
  77 + m -> {
  78 + ctx.tellNext(m, SUCCESS);
  79 + scheduleTickMsg(ctx);
  80 + },
  81 + t -> {
  82 + ctx.tellFailure(msg, t);
  83 + scheduleTickMsg(ctx);
  84 + });
78 85 }
79 86 }
80 87
81   - private void sentTickMsg(TbContext ctx) {
  88 + private void scheduleTickMsg(TbContext ctx) {
  89 + long curTs = System.currentTimeMillis();
  90 + if (lastScheduledTs == 0L) {
  91 + lastScheduledTs = curTs;
  92 + }
  93 + lastScheduledTs = lastScheduledTs + delay;
  94 + long curDelay = Math.max(0L, (lastScheduledTs - curTs));
82 95 TbMsg tickMsg = ctx.newMsg(TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), "");
83 96 nextTickId = tickMsg.getId();
84   - ctx.tellSelf(tickMsg, delay);
  97 + ctx.tellSelf(tickMsg, curDelay);
85 98 }
86 99
87 100 private ListenableFuture<TbMsg> generate(TbContext ctx) {
88 101 return ctx.getJsExecutor().executeAsync(() -> {
89 102 if (prevMsg == null) {
90   - prevMsg = ctx.newMsg( "", originatorId, new TbMsgMetaData(), "{}");
  103 + prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}");
91 104 }
92 105 TbMsg generated = jsEngine.executeGenerate(prevMsg);
93 106 prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData());
... ...