Commit d42df1d5b5d8e79ba988d5b7f80092ac943530ca

Authored by Andrew Shvayka
1 parent 15d7bf7c

Automatic message routing during tellNext based on originator ID

@@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor { @@ -111,6 +111,7 @@ public class AppActor extends RuleChainManagerActor {
111 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 111 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
112 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 112 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
  114 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
114 onToDeviceActorMsg((TenantAwareMsg) msg); 115 onToDeviceActorMsg((TenantAwareMsg) msg);
115 break; 116 break;
116 case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG: 117 case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.RuleChainId;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.msg.MsgType;
  22 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
  23 +import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  24 +
  25 +/**
  26 + * Created by ashvayka on 19.03.18.
  27 + */
  28 +@Data
  29 +final class RemoteToRuleChainTellNextMsg extends RuleNodeToRuleChainTellNextMsg implements TenantAwareMsg, RuleChainAwareMsg {
  30 +
  31 + private final TenantId tenantId;
  32 + private final RuleChainId ruleChainId;
  33 +
  34 + public RemoteToRuleChainTellNextMsg(RuleNodeToRuleChainTellNextMsg original, TenantId tenantId, RuleChainId ruleChainId) {
  35 + super(original.getOriginator(), original.getRelationTypes(), original.getMsg());
  36 + this.tenantId = tenantId;
  37 + this.ruleChainId = ruleChainId;
  38 + }
  39 +
  40 + @Override
  41 + public MsgType getMsgType() {
  42 + return MsgType.REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG;
  43 + }
  44 +
  45 +}
@@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe @@ -49,6 +49,7 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
49 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg); 49 processor.onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
50 break; 50 break;
51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: 51 case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  52 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
52 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); 53 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
53 break; 54 break;
54 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 55 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
@@ -20,6 +20,9 @@ import akka.actor.ActorRef; @@ -20,6 +20,9 @@ import akka.actor.ActorRef;
20 import akka.actor.Props; 20 import akka.actor.Props;
21 import akka.event.LoggingAdapter; 21 import akka.event.LoggingAdapter;
22 import com.datastax.driver.core.utils.UUIDs; 22 import com.datastax.driver.core.utils.UUIDs;
  23 +
  24 +import java.util.Optional;
  25 +
23 import org.thingsboard.server.actors.ActorSystemContext; 26 import org.thingsboard.server.actors.ActorSystemContext;
24 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg; 27 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
25 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg; 28 import org.thingsboard.server.actors.device.RuleEngineQueuePutAckMsg;
@@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain; @@ -37,6 +40,7 @@ import org.thingsboard.server.common.data.rule.RuleChain;
37 import org.thingsboard.server.common.data.rule.RuleNode; 40 import org.thingsboard.server.common.data.rule.RuleNode;
38 import org.thingsboard.server.common.msg.TbMsg; 41 import org.thingsboard.server.common.msg.TbMsg;
39 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 42 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  43 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
40 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 44 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
41 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 45 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
42 import org.thingsboard.server.dao.rule.RuleChainService; 46 import org.thingsboard.server.dao.rule.RuleChainService;
@@ -217,12 +221,30 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh @@ -217,12 +221,30 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
217 221
218 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) { 222 void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
219 checkActive(); 223 checkActive();
220 - RuleNodeId originator = envelope.getOriginator();  
221 - List<RuleNodeRelation> relations = nodeRoutes.get(originator).stream()  
222 - .filter(r -> contains(envelope.getRelationTypes(), r.getType()))  
223 - .collect(Collectors.toList()); 224 + TbMsg msg = envelope.getMsg();
  225 + EntityId originatorEntityId = msg.getOriginator();
  226 + Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(originatorEntityId);
  227 +
  228 + if (address.isPresent()) {
  229 + onRemoteTellNext(address.get(), envelope);
  230 + } else {
  231 + onLocalTellNext(envelope);
  232 + }
  233 + }
224 234
  235 + private void onRemoteTellNext(ServerAddress serverAddress, RuleNodeToRuleChainTellNextMsg envelope) {
225 TbMsg msg = envelope.getMsg(); 236 TbMsg msg = envelope.getMsg();
  237 + logger.debug("Forwarding [{}] msg to remote server [{}] due to changed originator id: [{}]", msg.getId(), serverAddress, msg.getOriginator());
  238 + envelope = new RemoteToRuleChainTellNextMsg(envelope, tenantId, entityId);
  239 + systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(serverAddress, envelope));
  240 + }
  241 +
  242 + private void onLocalTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
  243 + TbMsg msg = envelope.getMsg();
  244 + RuleNodeId originatorNodeId = envelope.getOriginator();
  245 + List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
  246 + .filter(r -> contains(envelope.getRelationTypes(), r.getType()))
  247 + .collect(Collectors.toList());
226 int relationsCount = relations.size(); 248 int relationsCount = relations.size();
227 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId(); 249 EntityId ackId = msg.getRuleNodeId() != null ? msg.getRuleNodeId() : msg.getRuleChainId();
228 if (relationsCount == 0) { 250 if (relationsCount == 0) {
@@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId; @@ -20,12 +20,13 @@ import org.thingsboard.server.common.data.id.RuleChainId;
20 import org.thingsboard.server.common.msg.MsgType; 20 import org.thingsboard.server.common.msg.MsgType;
21 import org.thingsboard.server.common.msg.TbActorMsg; 21 import org.thingsboard.server.common.msg.TbActorMsg;
22 import org.thingsboard.server.common.msg.TbMsg; 22 import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
23 24
24 /** 25 /**
25 * Created by ashvayka on 19.03.18. 26 * Created by ashvayka on 19.03.18.
26 */ 27 */
27 @Data 28 @Data
28 -public final class RuleChainToRuleChainMsg implements TbActorMsg { 29 +public final class RuleChainToRuleChainMsg implements TbActorMsg, RuleChainAwareMsg {
29 30
30 private final RuleChainId target; 31 private final RuleChainId target;
31 private final RuleChainId source; 32 private final RuleChainId source;
@@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg { @@ -34,6 +35,11 @@ public final class RuleChainToRuleChainMsg implements TbActorMsg {
34 private final boolean enqueue; 35 private final boolean enqueue;
35 36
36 @Override 37 @Override
  38 + public RuleChainId getRuleChainId() {
  39 + return target;
  40 + }
  41 +
  42 + @Override
37 public MsgType getMsgType() { 43 public MsgType getMsgType() {
38 return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG; 44 return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
39 } 45 }
@@ -27,7 +27,7 @@ import java.util.Set; @@ -27,7 +27,7 @@ import java.util.Set;
27 * Created by ashvayka on 19.03.18. 27 * Created by ashvayka on 19.03.18.
28 */ 28 */
29 @Data 29 @Data
30 -final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg { 30 +class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
31 31
32 private final RuleNodeId originator; 32 private final RuleNodeId originator;
33 private final Set<String> relationTypes; 33 private final Set<String> relationTypes;
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
35 import org.thingsboard.server.common.data.rule.RuleChain; 35 import org.thingsboard.server.common.data.rule.RuleChain;
36 import org.thingsboard.server.common.msg.TbActorMsg; 36 import org.thingsboard.server.common.msg.TbActorMsg;
37 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; 37 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
  38 +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
38 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 39 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
39 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 40 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
40 import scala.concurrent.duration.Duration; 41 import scala.concurrent.duration.Duration;
@@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor { @@ -94,7 +95,8 @@ public class TenantActor extends RuleChainManagerActor {
94 onToDeviceActorMsg((DeviceAwareMsg) msg); 95 onToDeviceActorMsg((DeviceAwareMsg) msg);
95 break; 96 break;
96 case RULE_CHAIN_TO_RULE_CHAIN_MSG: 97 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
97 - onRuleChainMsg((RuleChainToRuleChainMsg) msg); 98 + case REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  99 + onRuleChainMsg((RuleChainAwareMsg) msg);
98 break; 100 break;
99 default: 101 default:
100 return false; 102 return false;
@@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor { @@ -120,8 +122,8 @@ public class TenantActor extends RuleChainManagerActor {
120 else logger.info("[{}] No Root Chain", msg); 122 else logger.info("[{}] No Root Chain", msg);
121 } 123 }
122 124
123 - private void onRuleChainMsg(RuleChainToRuleChainMsg msg) {  
124 - ruleChainManager.getOrCreateActor(context(), msg.getTarget()).tell(msg, self()); 125 + private void onRuleChainMsg(RuleChainAwareMsg msg) {
  126 + ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
125 } 127 }
126 128
127 129
@@ -63,6 +63,11 @@ public enum MsgType { @@ -63,6 +63,11 @@ public enum MsgType {
63 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG, 63 RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
64 64
65 /** 65 /**
  66 + * Message forwarded from original rule chain to remote rule chain due to change in the cluster structure or originator entity of the TbMsg.
  67 + */
  68 + REMOTE_TO_RULE_CHAIN_TELL_NEXT_MSG,
  69 +
  70 + /**
66 * Message that is sent by RuleActor implementation to RuleActor itself to log the error. 71 * Message that is sent by RuleActor implementation to RuleActor itself to log the error.
67 */ 72 */
68 RULE_TO_SELF_ERROR_MSG, 73 RULE_TO_SELF_ERROR_MSG,
@@ -101,6 +106,10 @@ public enum MsgType { @@ -101,6 +106,10 @@ public enum MsgType {
101 /** 106 /**
102 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue. 107 * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.
103 */ 108 */
104 - RULE_ENGINE_QUEUE_PUT_ACK_MSG, ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG; 109 + RULE_ENGINE_QUEUE_PUT_ACK_MSG,
  110 + ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,
  111 + TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
  112 + SESSION_TIMEOUT_MSG,
  113 + SESSION_CTRL_MSG;
105 114
106 } 115 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.aware;
  17 +
  18 +import org.thingsboard.server.common.data.id.RuleChainId;
  19 +
  20 +public interface RuleChainAwareMsg {
  21 +
  22 + RuleChainId getRuleChainId();
  23 +
  24 +}