Commit f8929de57861fedd199d0538af5ef823b385dbbc

Authored by Andrew Shvayka
1 parent 00352231

Adding RuleChain and RuleNode actors

Showing 36 changed files with 670 additions and 195 deletions
... ... @@ -54,6 +54,10 @@
54 54 <artifactId>extensions-api</artifactId>
55 55 </dependency>
56 56 <dependency>
  57 + <groupId>org.thingsboard.rule-engine</groupId>
  58 + <artifactId>rule-engine-api</artifactId>
  59 + </dependency>
  60 + <dependency>
57 61 <groupId>org.thingsboard</groupId>
58 62 <artifactId>extensions-core</artifactId>
59 63 </dependency>
... ...
... ... @@ -136,6 +136,12 @@ public class ActorSystemContext {
136 136 @Value("${actors.plugin.error_persist_frequency}")
137 137 @Getter private long pluginErrorPersistFrequency;
138 138
  139 + @Value("${actors.rule.chain.error_persist_frequency}")
  140 + @Getter private long ruleChainErrorPersistFrequency;
  141 +
  142 + @Value("${actors.rule.node.error_persist_frequency}")
  143 + @Getter private long ruleNodeErrorPersistFrequency;
  144 +
139 145 @Value("${actors.rule.termination.delay}")
140 146 @Getter private long ruleActorTerminationDelay;
141 147
... ...
... ... @@ -33,9 +33,11 @@ import org.thingsboard.server.common.data.id.PluginId;
33 33 import org.thingsboard.server.common.data.id.RuleChainId;
34 34 import org.thingsboard.server.common.data.id.TenantId;
35 35 import org.thingsboard.server.common.data.page.PageDataIterable;
  36 +import org.thingsboard.server.common.msg.TbActorMsg;
36 37 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
37 38 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
38 39 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
  40 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
39 41 import org.thingsboard.server.dao.model.ModelConstants;
40 42 import org.thingsboard.server.dao.tenant.TenantService;
41 43 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
... ... @@ -86,27 +88,48 @@ public class AppActor extends RuleChainManagerActor {
86 88 }
87 89
88 90 @Override
89   - public void onReceive(Object msg) throws Exception {
90   - logger.debug("Received message: {}", msg);
91   - if (msg instanceof ToDeviceActorMsg) {
92   - processDeviceMsg((ToDeviceActorMsg) msg);
93   - } else if (msg instanceof ToPluginActorMsg) {
94   - onToPluginMsg((ToPluginActorMsg) msg);
95   - } else if (msg instanceof ToDeviceActorNotificationMsg) {
96   - onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
97   - } else if (msg instanceof Terminated) {
98   - processTermination((Terminated) msg);
99   - } else if (msg instanceof ClusterEventMsg) {
100   - broadcast(msg);
101   - } else if (msg instanceof ComponentLifecycleMsg) {
102   - onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
103   - } else if (msg instanceof PluginTerminationMsg) {
104   - onPluginTerminated((PluginTerminationMsg) msg);
  91 + protected void process(TbActorMsg msg) {
  92 + switch (msg.getMsgType()) {
  93 + case COMPONENT_LIFE_CYCLE_MSG:
  94 + onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
  95 + break;
  96 + case SERVICE_TO_RULE_ENGINE_MSG:
  97 + onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
  98 + break;
  99 + }
  100 + }
  101 +
  102 + private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
  103 + if (SYSTEM_TENANT.equals(msg.getTenantId())) {
  104 + //TODO: ashvayka handle this.
105 105 } else {
106   - logger.warning("Unknown message: {}!", msg);
  106 + getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
107 107 }
108 108 }
109 109
  110 +
  111 +// @Override
  112 +// public void onReceive(Object msg) throws Exception {
  113 +// logger.debug("Received message: {}", msg);
  114 +// if (msg instanceof ToDeviceActorMsg) {
  115 +// processDeviceMsg((ToDeviceActorMsg) msg);
  116 +// } else if (msg instanceof ToPluginActorMsg) {
  117 +// onToPluginMsg((ToPluginActorMsg) msg);
  118 +// } else if (msg instanceof ToDeviceActorNotificationMsg) {
  119 +// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
  120 +// } else if (msg instanceof Terminated) {
  121 +// processTermination((Terminated) msg);
  122 +// } else if (msg instanceof ClusterEventMsg) {
  123 +// broadcast(msg);
  124 +// } else if (msg instanceof ComponentLifecycleMsg) {
  125 +// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
  126 +// } else if (msg instanceof PluginTerminationMsg) {
  127 +// onPluginTerminated((PluginTerminationMsg) msg);
  128 +// } else {
  129 +// logger.warning("Unknown message: {}!", msg);
  130 +// }
  131 +// }
  132 +
110 133 private void onPluginTerminated(PluginTerminationMsg msg) {
111 134 pluginManager.remove(msg.getId());
112 135 }
... ...
... ... @@ -57,7 +57,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
57 57 }
58 58
59 59 @Override
60   - public void start() throws Exception {
  60 + public void start(ActorContext context) throws Exception {
61 61 logger.info("[{}] Going to start plugin actor.", entityId);
62 62 pluginMd = systemContext.getPluginService().findPluginById(entityId);
63 63 if (pluginMd == null) {
... ... @@ -76,7 +76,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
76 76 }
77 77
78 78 @Override
79   - public void stop() throws Exception {
  79 + public void stop(ActorContext context) throws Exception {
80 80 onStop();
81 81 }
82 82
... ... @@ -191,7 +191,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
191 191 if (pluginImpl != null) {
192 192 pluginImpl.stop(trustedCtx);
193 193 }
194   - start();
  194 + start(context);
195 195 }
196 196 }
197 197
... ... @@ -217,7 +217,7 @@ public class PluginActorMessageProcessor extends ComponentMsgProcessor<PluginId>
217 217 pluginImpl.resume(trustedCtx);
218 218 logger.info("[{}] Plugin resumed.", entityId);
219 219 } else {
220   - start();
  220 + start(context);
221 221 }
222 222 }
223 223
... ...
... ... @@ -68,7 +68,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
68 68 }
69 69
70 70 @Override
71   - public void start() throws Exception {
  71 + public void start(ActorContext context) throws Exception {
72 72 logger.info("[{}][{}] Starting rule actor.", entityId, tenantId);
73 73 ruleMd = systemContext.getRuleService().findRuleById(entityId);
74 74 if (ruleMd == null) {
... ... @@ -86,7 +86,7 @@ class RuleActorMessageProcessor extends ComponentMsgProcessor<RuleId> {
86 86 }
87 87
88 88 @Override
89   - public void stop() throws Exception {
  89 + public void stop(ActorContext context) throws Exception {
90 90 onStop();
91 91 }
92 92
... ...
  1 +package org.thingsboard.server.actors.ruleChain;
  2 +
  3 +import org.thingsboard.rule.engine.api.TbContext;
  4 +import org.thingsboard.server.actors.ActorSystemContext;
  5 +import org.thingsboard.server.common.msg.TbMsg;
  6 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  7 +import org.thingsboard.server.dao.attributes.AttributesService;
  8 +
  9 +/**
  10 + * Created by ashvayka on 19.03.18.
  11 + */
  12 +class DefaultTbContext implements TbContext {
  13 +
  14 + private final ActorSystemContext mainCtx;
  15 + private final RuleNodeCtx nodeCtx;
  16 +
  17 + public DefaultTbContext(ActorSystemContext mainCtx, RuleNodeCtx nodeCtx) {
  18 + this.mainCtx = mainCtx;
  19 + this.nodeCtx = nodeCtx;
  20 + }
  21 +
  22 + @Override
  23 + public void tellNext(TbMsg msg) {
  24 + tellNext(msg, null);
  25 + }
  26 +
  27 + @Override
  28 + public void tellNext(TbMsg msg, String relationType) {
  29 + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelfId(), relationType, msg), nodeCtx.getSelf());
  30 + }
  31 +
  32 + @Override
  33 + public void tellSelf(TbMsg msg, long delayMs) {
  34 +
  35 + }
  36 +
  37 + @Override
  38 + public void tellOthers(TbMsg msg) {
  39 +
  40 + }
  41 +
  42 + @Override
  43 + public void tellSibling(TbMsg msg, ServerAddress address) {
  44 +
  45 + }
  46 +
  47 + @Override
  48 + public void spawn(TbMsg msg) {
  49 +
  50 + }
  51 +
  52 + @Override
  53 + public void ack(TbMsg msg) {
  54 +
  55 + }
  56 +
  57 + @Override
  58 + public AttributesService getAttributesService() {
  59 + return mainCtx.getAttributesService();
  60 + }
  61 +}
... ...
... ... @@ -20,6 +20,9 @@ import org.thingsboard.server.actors.service.ComponentActor;
20 20 import org.thingsboard.server.actors.service.ContextBasedCreator;
21 21 import org.thingsboard.server.common.data.id.RuleChainId;
22 22 import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.msg.TbActorMsg;
  24 +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
  25 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
23 26
24 27 public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
25 28
... ... @@ -30,8 +33,18 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
30 33 }
31 34
32 35 @Override
33   - public void onReceive(Object msg) throws Exception {
34   - logger.debug("[{}][{}] Unknown msg type.", tenantId, id, msg.getClass().getName());
  36 + protected void process(TbActorMsg msg) {
  37 + switch (msg.getMsgType()) {
  38 + case COMPONENT_LIFE_CYCLE_MSG:
  39 + onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
  40 + break;
  41 + case SERVICE_TO_RULE_ENGINE_MSG:
  42 + processor.onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
  43 + break;
  44 + case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  45 + processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
  46 + break;
  47 + }
35 48 }
36 49
37 50 public static class ActorCreator extends ContextBasedCreator<RuleChainActor> {
... ... @@ -54,6 +67,6 @@ public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMe
54 67
55 68 @Override
56 69 protected long getErrorPersistFrequency() {
57   - return systemContext.getPluginErrorPersistFrequency();
  70 + return systemContext.getRuleChainErrorPersistFrequency();
58 71 }
59 72 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -17,77 +17,127 @@ package org.thingsboard.server.actors.ruleChain;
17 17
18 18 import akka.actor.ActorContext;
19 19 import akka.actor.ActorRef;
  20 +import akka.actor.Props;
20 21 import akka.event.LoggingAdapter;
21   -import com.fasterxml.jackson.core.JsonProcessingException;
22 22 import org.thingsboard.server.actors.ActorSystemContext;
23   -import org.thingsboard.server.actors.plugin.*;
  23 +import org.thingsboard.server.actors.service.DefaultActorService;
24 24 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
25   -import org.thingsboard.server.common.data.id.PluginId;
  25 +import org.thingsboard.server.common.data.EntityType;
  26 +import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.id.RuleChainId;
  28 +import org.thingsboard.server.common.data.id.RuleNodeId;
27 29 import org.thingsboard.server.common.data.id.TenantId;
28   -import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
29   -import org.thingsboard.server.common.data.plugin.ComponentType;
30   -import org.thingsboard.server.common.data.plugin.PluginMetaData;
  30 +import org.thingsboard.server.common.data.relation.EntityRelation;
  31 +import org.thingsboard.server.common.data.rule.RuleChain;
  32 +import org.thingsboard.server.common.data.rule.RuleNode;
  33 +import org.thingsboard.server.common.msg.TbMsg;
31 34 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
32   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
33   -import org.thingsboard.server.extensions.api.plugins.Plugin;
34   -import org.thingsboard.server.extensions.api.plugins.PluginInitializationException;
35   -import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
36   -import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
37   -import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
38   -import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
39   -import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
40   -import org.thingsboard.server.extensions.api.rules.RuleException;
  35 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  36 +import org.thingsboard.server.dao.rule.RuleChainService;
  37 +
  38 +import java.util.ArrayList;
  39 +import java.util.HashMap;
  40 +import java.util.List;
  41 +import java.util.Map;
41 42
42 43 /**
43 44 * @author Andrew Shvayka
44 45 */
45 46 public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
46 47
47   - private ComponentLifecycleState state;
  48 + private final ActorRef parent;
  49 + private final ActorRef self;
  50 + private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
  51 + private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
  52 + private final RuleChainService service;
  53 +
  54 + private RuleNodeId firstId;
  55 + private RuleNodeCtx firstNode;
48 56
49   - protected RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId pluginId, ActorSystemContext systemContext
  57 + RuleChainActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, ActorSystemContext systemContext
50 58 , LoggingAdapter logger, ActorRef parent, ActorRef self) {
51   - super(systemContext, logger, tenantId, pluginId);
  59 + super(systemContext, logger, tenantId, ruleChainId);
  60 + this.parent = parent;
  61 + this.self = self;
  62 + this.nodeActors = new HashMap<>();
  63 + this.nodeRoutes = new HashMap<>();
  64 + this.service = systemContext.getRuleChainService();
52 65 }
53 66
54 67 @Override
55   - public void start() throws Exception {
56   -
  68 + public void start(ActorContext context) throws Exception {
  69 + RuleChain ruleChain = service.findRuleChainById(entityId);
  70 + List<RuleNode> ruleNodeList = service.getRuleChainNodes(entityId);
  71 + // Creating and starting the actors;
  72 + for (RuleNode ruleNode : ruleNodeList) {
  73 + String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ?
  74 + DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
  75 + ActorRef ruleNodeActor = context.actorOf(
  76 + Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getId()))
  77 + .withDispatcher(dispatcherName), ruleNode.toString());
  78 + nodeActors.put(ruleNode.getId(), new RuleNodeCtx(self, ruleNodeActor, ruleNode.getId()));
  79 + }
  80 + // Populating the routes map;
  81 + for (RuleNode ruleNode : ruleNodeList) {
  82 + List<EntityRelation> relations = service.getRuleNodeRelations(ruleNode.getId());
  83 + for (EntityRelation relation : relations) {
  84 + if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
  85 + RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
  86 + if (ruleNodeCtx == null) {
  87 + throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
  88 + }
  89 + }
  90 + nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
  91 + .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
  92 + }
  93 + }
  94 +
  95 + firstId = ruleChain.getFirstRuleNodeId();
  96 + firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
57 97 }
58 98
59 99 @Override
60   - public void stop() throws Exception {
61   -
  100 + public void stop(ActorContext context) throws Exception {
  101 + nodeActors.values().stream().map(RuleNodeCtx::getSelf).forEach(context::stop);
  102 + nodeActors.clear();
  103 + nodeRoutes.clear();
62 104 }
63 105
64 106 @Override
65   - public void onCreated(ActorContext context) throws Exception {
  107 + public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
66 108
67 109 }
68 110
69   - @Override
70   - public void onUpdate(ActorContext context) throws Exception {
71   -
  111 + void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
  112 + TbMsg tbMsg = envelope.getTbMsg();
  113 + //TODO: push to queue and act on ack in async way
  114 + pushMstToNode(firstNode, tbMsg);
72 115 }
73 116
74   - @Override
75   - public void onActivate(ActorContext context) throws Exception {
76   -
  117 + void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
  118 + RuleNodeId originator = envelope.getOriginator();
  119 + String targetRelationType = envelope.getRelationType();
  120 + //TODO: log debug output
  121 + List<RuleNodeRelation> relations = nodeRoutes.get(originator);
  122 + for (RuleNodeRelation relation : relations) {
  123 + if (targetRelationType == null || targetRelationType.equalsIgnoreCase(relation.getType())) {
  124 + switch (relation.getOut().getEntityType()) {
  125 + case RULE_NODE:
  126 + RuleNodeId targetRuleNodeId = new RuleNodeId(relation.getOut().getId());
  127 + RuleNodeCtx targetRuleNode = nodeActors.get(targetRuleNodeId);
  128 + pushMstToNode(targetRuleNode, envelope.getMsg());
  129 + break;
  130 + case RULE_CHAIN:
  131 +// TODO: implement
  132 + break;
  133 + }
  134 + }
  135 + }
77 136 }
78 137
79   - @Override
80   - public void onSuspend(ActorContext context) throws Exception {
81   -
82   - }
83   -
84   - @Override
85   - public void onStop(ActorContext context) throws Exception {
86   -
  138 + private void pushMstToNode(RuleNodeCtx nodeCtx, TbMsg msg) {
  139 + //TODO: log debug input
  140 + firstNode.getSelf().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg), self);
87 141 }
88 142
89   - @Override
90   - public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
91   -
92   - }
93 143 }
... ...
... ... @@ -8,6 +8,7 @@ import org.thingsboard.server.actors.shared.rulechain.RuleChainManager;
8 8 import org.thingsboard.server.common.data.id.EntityId;
9 9 import org.thingsboard.server.common.data.id.PluginId;
10 10 import org.thingsboard.server.common.data.id.RuleChainId;
  11 +import org.thingsboard.server.dao.rule.RuleChainService;
11 12
12 13 /**
13 14 * Created by ashvayka on 15.03.18.
... ... @@ -16,11 +17,13 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
16 17
17 18 protected final RuleChainManager ruleChainManager;
18 19 protected final PluginManager pluginManager;
  20 + protected final RuleChainService ruleChainService;
19 21
20 22 public RuleChainManagerActor(ActorSystemContext systemContext, RuleChainManager ruleChainManager, PluginManager pluginManager) {
21 23 super(systemContext);
22 24 this.ruleChainManager = ruleChainManager;
23 25 this.pluginManager = pluginManager;
  26 + this.ruleChainService = systemContext.getRuleChainService();
24 27 }
25 28
26 29 protected void initRuleChains() {
... ...
  1 +package org.thingsboard.server.actors.ruleChain;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.rule.engine.api.TbContext;
  5 +import org.thingsboard.server.common.data.id.RuleNodeId;
  6 +import org.thingsboard.server.common.msg.MsgType;
  7 +import org.thingsboard.server.common.msg.TbActorMsg;
  8 +import org.thingsboard.server.common.msg.TbMsg;
  9 +
  10 +/**
  11 + * Created by ashvayka on 19.03.18.
  12 + */
  13 +@Data
  14 +final class RuleChainToRuleNodeMsg implements TbActorMsg {
  15 +
  16 + private final TbContext ctx;
  17 + private final TbMsg msg;
  18 +
  19 + @Override
  20 + public MsgType getMsgType() {
  21 + return MsgType.RULE_CHAIN_TO_RULE_MSG;
  22 + }
  23 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import org.thingsboard.server.actors.ActorSystemContext;
  19 +import org.thingsboard.server.actors.service.ComponentActor;
  20 +import org.thingsboard.server.actors.service.ContextBasedCreator;
  21 +import org.thingsboard.server.common.data.id.RuleChainId;
  22 +import org.thingsboard.server.common.data.id.RuleNodeId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.msg.TbActorMsg;
  25 +
  26 +public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
  27 +
  28 + private final RuleChainId ruleChainId;
  29 +
  30 + private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  31 + super(systemContext, tenantId, ruleNodeId);
  32 + this.ruleChainId = ruleChainId;
  33 + setProcessor(new RuleNodeActorMessageProcessor(tenantId, ruleChainId, ruleNodeId, systemContext,
  34 + logger, context().parent(), context().self()));
  35 + }
  36 +
  37 + @Override
  38 + protected void process(TbActorMsg msg) {
  39 + switch (msg.getMsgType()) {
  40 + case RULE_CHAIN_TO_RULE_MSG:
  41 + processor.onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
  42 + break;
  43 + }
  44 + }
  45 +
  46 + public static class ActorCreator extends ContextBasedCreator<RuleNodeActor> {
  47 + private static final long serialVersionUID = 1L;
  48 +
  49 + private final TenantId tenantId;
  50 + private final RuleChainId ruleChainId;
  51 + private final RuleNodeId ruleNodeId;
  52 +
  53 + public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
  54 + super(context);
  55 + this.tenantId = tenantId;
  56 + this.ruleChainId = ruleChainId;
  57 + this.ruleNodeId = ruleNodeId;
  58 +
  59 + }
  60 +
  61 + @Override
  62 + public RuleNodeActor create() throws Exception {
  63 + return new RuleNodeActor(context, tenantId, ruleChainId, ruleNodeId);
  64 + }
  65 + }
  66 +
  67 + @Override
  68 + protected long getErrorPersistFrequency() {
  69 + return systemContext.getRuleNodeErrorPersistFrequency();
  70 + }
  71 +
  72 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.ruleChain;
  17 +
  18 +import akka.actor.ActorContext;
  19 +import akka.actor.ActorRef;
  20 +import akka.actor.Props;
  21 +import akka.event.LoggingAdapter;
  22 +import org.thingsboard.server.actors.ActorSystemContext;
  23 +import org.thingsboard.server.actors.service.DefaultActorService;
  24 +import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
  25 +import org.thingsboard.server.common.data.EntityType;
  26 +import org.thingsboard.server.common.data.id.EntityId;
  27 +import org.thingsboard.server.common.data.id.RuleChainId;
  28 +import org.thingsboard.server.common.data.id.RuleNodeId;
  29 +import org.thingsboard.server.common.data.id.TenantId;
  30 +import org.thingsboard.server.common.data.relation.EntityRelation;
  31 +import org.thingsboard.server.common.data.rule.RuleChain;
  32 +import org.thingsboard.server.common.data.rule.RuleNode;
  33 +import org.thingsboard.server.common.msg.TbMsg;
  34 +import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  35 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  36 +import org.thingsboard.server.dao.rule.RuleChainService;
  37 +
  38 +import java.util.ArrayList;
  39 +import java.util.HashMap;
  40 +import java.util.List;
  41 +import java.util.Map;
  42 +
  43 +/**
  44 + * @author Andrew Shvayka
  45 + */
  46 +public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
  47 +
  48 + private final ActorRef parent;
  49 + private final ActorRef self;
  50 + private final RuleChainService service;
  51 +
  52 + RuleNodeActorMessageProcessor(TenantId tenantId, RuleChainId ruleChainId, RuleNodeId ruleNodeId, ActorSystemContext systemContext
  53 + , LoggingAdapter logger, ActorRef parent, ActorRef self) {
  54 + super(systemContext, logger, tenantId, ruleNodeId);
  55 + this.parent = parent;
  56 + this.self = self;
  57 + this.service = systemContext.getRuleChainService();
  58 + }
  59 +
  60 + @Override
  61 + public void start(ActorContext context) throws Exception {
  62 +
  63 + }
  64 +
  65 + @Override
  66 + public void stop(ActorContext context) throws Exception {
  67 + }
  68 +
  69 + @Override
  70 + public void onClusterEventMsg(ClusterEventMsg msg) throws Exception {
  71 +
  72 + }
  73 +
  74 + void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) {
  75 +
  76 + }
  77 +
  78 +}
... ...
  1 +package org.thingsboard.server.actors.ruleChain;
  2 +
  3 +import akka.actor.ActorRef;
  4 +import lombok.Data;
  5 +import org.thingsboard.server.common.data.id.RuleNodeId;
  6 +
  7 +/**
  8 + * Created by ashvayka on 19.03.18.
  9 + */
  10 +@Data
  11 +final class RuleNodeCtx {
  12 + private final ActorRef chainActor;
  13 + private final ActorRef self;
  14 + private final RuleNodeId selfId;
  15 +}
... ...
  1 +package org.thingsboard.server.actors.ruleChain;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.id.EntityId;
  5 +
  6 +/**
  7 + * Created by ashvayka on 19.03.18.
  8 + */
  9 +
  10 +@Data
  11 +final class RuleNodeRelation {
  12 +
  13 + private final EntityId in;
  14 + private final EntityId out;
  15 + private final String type;
  16 +
  17 +}
... ...
  1 +package org.thingsboard.server.actors.ruleChain;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.id.RuleNodeId;
  5 +import org.thingsboard.server.common.msg.MsgType;
  6 +import org.thingsboard.server.common.msg.TbActorMsg;
  7 +import org.thingsboard.server.common.msg.TbMsg;
  8 +
  9 +/**
  10 + * Created by ashvayka on 19.03.18.
  11 + */
  12 +@Data
  13 +final class RuleNodeToRuleChainTellNextMsg implements TbActorMsg {
  14 +
  15 + private final RuleNodeId originator;
  16 + private final String relationType;
  17 + private final TbMsg msg;
  18 +
  19 + @Override
  20 + public MsgType getMsgType() {
  21 + return MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG;
  22 + }
  23 +
  24 +}
... ...
... ... @@ -17,6 +17,8 @@ package org.thingsboard.server.actors.service;
17 17
18 18 import org.thingsboard.server.common.data.id.*;
19 19 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  20 +import org.thingsboard.server.common.msg.TbMsg;
  21 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
20 22 import org.thingsboard.server.common.transport.SessionMsgProcessor;
21 23 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
22 24 import org.thingsboard.server.service.cluster.rpc.RpcMsgListener;
... ... @@ -25,6 +27,8 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
25 27
26 28 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
27 29
  30 + void onMsg(ServiceToRuleEngineMsg msg);
  31 +
28 32 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
29 33
30 34 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
... ...
... ... @@ -54,7 +54,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
54 54 @Override
55 55 public void preStart() {
56 56 try {
57   - processor.start();
  57 + processor.start(context());
58 58 logLifecycleEvent(ComponentLifecycleEvent.STARTED);
59 59 if (systemContext.isStatisticsEnabled()) {
60 60 scheduleStatsPersistTick();
... ... @@ -78,7 +78,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
78 78 @Override
79 79 public void postStop() {
80 80 try {
81   - processor.stop();
  81 + processor.stop(context());
82 82 logLifecycleEvent(ComponentLifecycleEvent.STOPPED);
83 83 } catch (Exception e) {
84 84 logger.warning("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage());
... ...
... ... @@ -16,9 +16,13 @@
16 16 package org.thingsboard.server.actors.service;
17 17
18 18 import akka.actor.UntypedActor;
  19 +import akka.event.Logging;
  20 +import akka.event.LoggingAdapter;
19 21 import org.thingsboard.server.actors.ActorSystemContext;
  22 +import org.thingsboard.server.common.msg.TbActorMsg;
20 23
21 24 public abstract class ContextAwareActor extends UntypedActor {
  25 + protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
22 26
23 27 public static final int ENTITY_PACK_LIMIT = 1024;
24 28
... ... @@ -28,4 +32,19 @@ public abstract class ContextAwareActor extends UntypedActor {
28 32 super();
29 33 this.systemContext = systemContext;
30 34 }
  35 +
  36 + @Override
  37 + public void onReceive(Object msg) throws Exception {
  38 + if (logger.isDebugEnabled()) {
  39 + logger.debug("Processing msg: {}", msg);
  40 + }
  41 + if (msg instanceof TbActorMsg) {
  42 + process((TbActorMsg) msg);
  43 + }
  44 + else {
  45 + logger.warning("Unknown message: {}!", msg);
  46 + }
  47 + }
  48 +
  49 + protected abstract void process(TbActorMsg msg);
31 50 }
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
37 37 import org.thingsboard.server.common.msg.cluster.ServerAddress;
38 38 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
39 39 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
  40 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
40 41 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
41 42 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
42 43 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
... ... @@ -126,6 +127,11 @@ public class DefaultActorService implements ActorService {
126 127 }
127 128
128 129 @Override
  130 + public void onMsg(ServiceToRuleEngineMsg msg) {
  131 + appActor.tell(msg, ActorRef.noSender());
  132 + }
  133 +
  134 + @Override
129 135 public void process(SessionAwareMsg msg) {
130 136 log.debug("Processing session aware msg: {}", msg);
131 137 sessionManagerActor.tell(msg, ActorRef.noSender());
... ...
... ... @@ -102,9 +102,6 @@ public abstract class AbstractContextAwareMsgProcessor {
102 102 case FILTER:
103 103 configurationClazz = ((Filter) componentClazz.getAnnotation(Filter.class)).configuration();
104 104 break;
105   - case PROCESSOR:
106   - configurationClazz = ((Processor) componentClazz.getAnnotation(Processor.class)).configuration();
107   - break;
108 105 case ACTION:
109 106 configurationClazz = ((Action) componentClazz.getAnnotation(Action.class)).configuration();
110 107 break;
... ...
... ... @@ -33,21 +33,36 @@ public abstract class ComponentMsgProcessor<T> extends AbstractContextAwareMsgPr
33 33 this.entityId = id;
34 34 }
35 35
36   - public abstract void start() throws Exception;
  36 + public abstract void start(ActorContext context) throws Exception;
37 37
38   - public abstract void stop() throws Exception;
  38 + public abstract void stop(ActorContext context) throws Exception;
39 39
40   - public abstract void onCreated(ActorContext context) throws Exception;
  40 + public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
41 41
42   - public abstract void onUpdate(ActorContext context) throws Exception;
  42 + public void onCreated(ActorContext context) throws Exception {
  43 + start(context);
  44 + }
43 45
44   - public abstract void onActivate(ActorContext context) throws Exception;
  46 + public void onUpdate(ActorContext context) throws Exception {
  47 + restart(context);
  48 + }
45 49
46   - public abstract void onSuspend(ActorContext context) throws Exception;
  50 + public void onActivate(ActorContext context) throws Exception {
  51 + restart(context);
  52 + }
47 53
48   - public abstract void onStop(ActorContext context) throws Exception;
  54 + public void onSuspend(ActorContext context) throws Exception {
  55 + stop(context);
  56 + }
49 57
50   - public abstract void onClusterEventMsg(ClusterEventMsg msg) throws Exception;
  58 + public void onStop(ActorContext context) throws Exception {
  59 + stop(context);
  60 + }
  61 +
  62 + private void restart(ActorContext context) throws Exception {
  63 + stop(context);
  64 + start(context);
  65 + }
51 66
52 67 public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) {
53 68 schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency);
... ...
... ... @@ -43,13 +43,16 @@ public abstract class EntityActorsManager<T extends EntityId, A extends UntypedA
43 43
44 44 public void init(ActorContext context) {
45 45 for (M entity : new PageDataIterable<>(getFetchEntitiesFunction(), ContextAwareActor.ENTITY_PACK_LIMIT)) {
46   - log.debug("[{}] Creating plugin actor", entity.getId());
  46 + T entityId = (T) entity.getId();
  47 + log.debug("[{}|{}] Creating entity actor", entityId.getEntityType(), entityId.getId());
47 48 //TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa.
48   - getOrCreateActor(context, (T) entity.getId());
49   - log.debug("[{}] Plugin actor created.", entity.getId());
  49 + ActorRef actorRef = getOrCreateActor(context, entityId);
  50 + visit(entity, actorRef);
  51 + log.debug("[{}|{}] Entity actor created.", entityId.getEntityType(), entityId.getId());
50 52 }
51 53 }
52 54
  55 + protected void visit(M entity, ActorRef actorRef) {}
53 56
54 57 public ActorRef getOrCreateActor(ActorContext context, T entityId) {
55 58 return actors.computeIfAbsent(entityId, eId ->
... ...
1 1 package org.thingsboard.server.actors.shared.rulechain;
2 2
  3 +import akka.actor.ActorRef;
3 4 import akka.japi.Creator;
  5 +import lombok.Getter;
4 6 import lombok.extern.slf4j.Slf4j;
5 7 import org.thingsboard.server.actors.ActorSystemContext;
6 8 import org.thingsboard.server.actors.ruleChain.RuleChainActor;
... ... @@ -16,6 +18,10 @@ import org.thingsboard.server.dao.rule.RuleChainService;
16 18 public abstract class RuleChainManager extends EntityActorsManager<RuleChainId, RuleChainActor, RuleChain> {
17 19
18 20 protected final RuleChainService service;
  21 + @Getter
  22 + protected RuleChain rootChain;
  23 + @Getter
  24 + protected ActorRef rootChainActor;
19 25
20 26 public RuleChainManager(ActorSystemContext systemContext) {
21 27 super(systemContext);
... ... @@ -27,4 +33,12 @@ public abstract class RuleChainManager extends EntityActorsManager<RuleChainId,
27 33 return new RuleChainActor.ActorCreator(systemContext, getTenantId(), entityId);
28 34 }
29 35
  36 + @Override
  37 + protected void visit(RuleChain entity, ActorRef actorRef) {
  38 + if (entity.isRoot()) {
  39 + rootChain = entity;
  40 + rootChainActor = actorRef;
  41 + }
  42 + }
  43 +
30 44 }
... ...
... ... @@ -27,10 +27,12 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
27 27 import org.thingsboard.server.actors.service.DefaultActorService;
28 28 import org.thingsboard.server.actors.shared.plugin.TenantPluginManager;
29 29 import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager;
30   -import org.thingsboard.server.common.data.id.*;
31   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  30 +import org.thingsboard.server.common.data.id.DeviceId;
  31 +import org.thingsboard.server.common.data.id.TenantId;
  32 +import org.thingsboard.server.common.msg.TbActorMsg;
32 33 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
33 34 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
  35 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
34 36 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
35 37 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
36 38
... ... @@ -39,8 +41,6 @@ import java.util.Map;
39 41
40 42 public class TenantActor extends RuleChainManagerActor {
41 43
42   - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
43   -
44 44 private final TenantId tenantId;
45 45 private final Map<DeviceId, ActorRef> deviceActors;
46 46
... ... @@ -62,25 +62,42 @@ public class TenantActor extends RuleChainManagerActor {
62 62 }
63 63
64 64 @Override
65   - public void onReceive(Object msg) throws Exception {
66   - logger.debug("[{}] Received message: {}", tenantId, msg);
67   - if (msg instanceof ToDeviceActorMsg) {
68   - onToDeviceActorMsg((ToDeviceActorMsg) msg);
69   - } else if (msg instanceof ToPluginActorMsg) {
70   - onToPluginMsg((ToPluginActorMsg) msg);
71   - } else if (msg instanceof ToDeviceActorNotificationMsg) {
72   - onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
73   - } else if (msg instanceof ClusterEventMsg) {
74   - broadcast(msg);
75   - } else if (msg instanceof ComponentLifecycleMsg) {
76   - onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
77   - } else if (msg instanceof PluginTerminationMsg) {
78   - onPluginTerminated((PluginTerminationMsg) msg);
79   - } else {
80   - logger.warning("[{}] Unknown message: {}!", tenantId, msg);
  65 + protected void process(TbActorMsg msg) {
  66 + switch (msg.getMsgType()) {
  67 + case COMPONENT_LIFE_CYCLE_MSG:
  68 + onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
  69 + break;
  70 + case SERVICE_TO_RULE_ENGINE_MSG:
  71 + onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
  72 + break;
81 73 }
82 74 }
83 75
  76 + private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
  77 + ruleChainManager.getRootChainActor().tell(msg, self());
  78 + }
  79 +
  80 +
  81 +// @Override
  82 +// public void onReceive(Object msg) throws Exception {
  83 +// logger.debug("[{}] Received message: {}", tenantId, msg);
  84 +// if (msg instanceof ToDeviceActorMsg) {
  85 +// onToDeviceActorMsg((ToDeviceActorMsg) msg);
  86 +// } else if (msg instanceof ToPluginActorMsg) {
  87 +// onToPluginMsg((ToPluginActorMsg) msg);
  88 +// } else if (msg instanceof ToDeviceActorNotificationMsg) {
  89 +// onToDeviceActorMsg((ToDeviceActorNotificationMsg) msg);
  90 +// } else if (msg instanceof ClusterEventMsg) {
  91 +// broadcast(msg);
  92 +// } else if (msg instanceof ComponentLifecycleMsg) {
  93 +// onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
  94 +// } else if (msg instanceof PluginTerminationMsg) {
  95 +// onPluginTerminated((PluginTerminationMsg) msg);
  96 +// } else {
  97 +// logger.warning("[{}] Unknown message: {}!", tenantId, msg);
  98 +// }
  99 +// }
  100 +
84 101 private void broadcast(Object msg) {
85 102 pluginManager.broadcast(msg);
86 103 deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
... ...
... ... @@ -215,6 +215,12 @@ actors:
215 215 termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
216 216 # Errors for particular actor are persisted once per specified amount of milliseconds
217 217 error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
  218 + chain:
  219 + # Errors for particular actor are persisted once per specified amount of milliseconds
  220 + error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"
  221 + node:
  222 + # Errors for particular actor are persisted once per specified amount of milliseconds
  223 + error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
218 224 statistics:
219 225 # Enable/disable actor statistics
220 226 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
... ...
... ... @@ -37,6 +37,8 @@ public class DataConstants {
37 37 public static final String ERROR = "ERROR";
38 38 public static final String LC_EVENT = "LC_EVENT";
39 39 public static final String STATS = "STATS";
  40 + public static final String RULE_CHAIN_DEBUG = "DEBUG_RULE_CHAIN";
  41 + public static final String RULE_NODE_DEBUG = "DEBUG_RULE_NODE";
40 42
41 43 public static final String ONEWAY = "ONEWAY";
42 44 public static final String TWOWAY = "TWOWAY";
... ...
... ... @@ -20,6 +20,6 @@ package org.thingsboard.server.common.data.plugin;
20 20 */
21 21 public enum ComponentType {
22 22
23   - FILTER, PROCESSOR, ACTION, PLUGIN
  23 + ENRICHMENT, FILTER, PROCESSOR, TRANSFORMATION, ACTION, PLUGIN
24 24
25 25 }
... ...
... ... @@ -4,4 +4,34 @@ package org.thingsboard.server.common.msg;
4 4 * Created by ashvayka on 15.03.18.
5 5 */
6 6 public enum MsgType {
  7 +
  8 + /**
  9 + * ADDED/UPDATED/DELETED events for main entities.
  10 + *
  11 + * @See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
  12 + */
  13 + COMPONENT_LIFE_CYCLE_MSG,
  14 +
  15 + /**
  16 + * Misc messages from the REST API/SERVICE layer to the new rule engine.
  17 + *
  18 + * @See {@link org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg}
  19 + */
  20 + SERVICE_TO_RULE_ENGINE_MSG,
  21 +
  22 +
  23 + SESSION_TO_DEVICE_ACTOR_MSG,
  24 + DEVICE_ACTOR_TO_SESSION_MSG,
  25 +
  26 +
  27 + /**
  28 + * Message that is sent by RuleChainActor to RuleActor with command to process TbMsg.
  29 + */
  30 + RULE_CHAIN_TO_RULE_MSG,
  31 +
  32 + /**
  33 + * Message that is sent by RuleActor to RuleChainActor with command to process TbMsg by next nodes in chain.
  34 + */
  35 + RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,
  36 +
7 37 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -36,7 +36,7 @@ public final class TbMsg implements Serializable {
36 36 private final String type;
37 37 private final EntityId originator;
38 38 private final TbMsgMetaData metaData;
39   -
  39 + private final TbMsgDataType dataType;
40 40 private final byte[] data;
41 41
42 42 public static ByteBuffer toBytes(TbMsg msg) {
... ... @@ -49,11 +49,10 @@ public final class TbMsg implements Serializable {
49 49 }
50 50
51 51 if (msg.getMetaData() != null) {
52   - MsgProtos.TbMsgProto.TbMsgMetaDataProto.Builder metadataBuilder = MsgProtos.TbMsgProto.TbMsgMetaDataProto.newBuilder();
53   - metadataBuilder.putAllData(msg.getMetaData().getData());
54   - builder.addMetaData(metadataBuilder.build());
  52 + builder.setMetaData(MsgProtos.TbMsgMetaDataProto.newBuilder().putAllData(msg.getMetaData().getData()).build());
55 53 }
56 54
  55 + builder.setDataType(msg.getDataType().ordinal());
57 56 builder.setData(ByteString.copyFrom(msg.getData()));
58 57 byte[] bytes = builder.build().toByteArray();
59 58 return ByteBuffer.wrap(bytes);
... ... @@ -63,16 +62,11 @@ public final class TbMsg implements Serializable {
63 62 try {
64 63 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
65 64 TbMsgMetaData metaData = new TbMsgMetaData();
66   - if (proto.getMetaDataCount() > 0) {
67   - metaData.setData(proto.getMetaData(0).getDataMap());
68   - }
69   -
70   - EntityId entityId = null;
71   - if (proto.getEntityId() != null) {
72   - entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
73   - }
  65 + metaData.setData(proto.getMetaData().getDataMap());
74 66
75   - return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, proto.getData().toByteArray());
  67 + EntityId entityId = EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId());
  68 + TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
  69 + return new TbMsg(UUID.fromString(proto.getId()), proto.getType(), entityId, metaData, dataType, proto.getData().toByteArray());
76 70 } catch (InvalidProtocolBufferException e) {
77 71 throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
78 72 }
... ...
  1 +package org.thingsboard.server.common.msg;
  2 +
  3 +/**
  4 + * Created by ashvayka on 15.03.18.
  5 + */
  6 +public enum TbMsgDataType {
  7 +
  8 + // Do not change ordering. We use ordinal to save some bytes on serialization
  9 + JSON, TEXT, BINARY;
  10 +
  11 +}
... ...
... ... @@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.EntityType;
21 21 import org.thingsboard.server.common.data.id.*;
22 22 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
23 23 import org.thingsboard.server.common.data.rule.RuleChain;
  24 +import org.thingsboard.server.common.msg.MsgType;
  25 +import org.thingsboard.server.common.msg.TbActorMsg;
24 26 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
25 27 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
26 28
... ... @@ -30,7 +32,7 @@ import java.util.Optional;
30 32 * @author Andrew Shvayka
31 33 */
32 34 @ToString
33   -public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
  35 +public class ComponentLifecycleMsg implements TbActorMsg, TenantAwareMsg, ToAllNodesMsg {
34 36 @Getter
35 37 private final TenantId tenantId;
36 38 @Getter
... ... @@ -56,4 +58,8 @@ public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
56 58 return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
57 59 }
58 60
  61 + @Override
  62 + public MsgType getMsgType() {
  63 + return MsgType.COMPONENT_LIFE_CYCLE_MSG;
  64 + }
59 65 }
... ...
  1 +package org.thingsboard.server.common.msg.system;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.id.TenantId;
  5 +import org.thingsboard.server.common.msg.MsgType;
  6 +import org.thingsboard.server.common.msg.TbActorMsg;
  7 +import org.thingsboard.server.common.msg.TbMsg;
  8 +
  9 +/**
  10 + * Created by ashvayka on 15.03.18.
  11 + */
  12 +@Data
  13 +public final class ServiceToRuleEngineMsg implements TbActorMsg {
  14 +
  15 + private final TenantId tenantId;
  16 + private final TbMsg tbMsg;
  17 +
  18 + @Override
  19 + public MsgType getMsgType() {
  20 + return MsgType.SERVICE_TO_RULE_ENGINE_MSG;
  21 + }
  22 +}
... ...
... ... @@ -19,6 +19,9 @@ package msgqueue;
19 19 option java_package = "org.thingsboard.server.common.msg.gen";
20 20 option java_outer_classname = "MsgProtos";
21 21
  22 +message TbMsgMetaDataProto {
  23 + map<string, string> data = 1;
  24 +}
22 25
23 26 message TbMsgProto {
24 27 string id = 1;
... ... @@ -26,11 +29,8 @@ message TbMsgProto {
26 29 string entityType = 3;
27 30 string entityId = 4;
28 31
29   - message TbMsgMetaDataProto {
30   - map<string, string> data = 1;
31   - }
  32 + TbMsgMetaDataProto metaData = 5;
32 33
33   - repeated TbMsgMetaDataProto metaData = 5;
34   -
35   - bytes data = 6;
  34 + int32 dataType = 6;
  35 + bytes data = 7;
36 36 }
\ No newline at end of file
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.dao.rule;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19   -import com.fasterxml.jackson.databind.node.ArrayNode;
20 19 import com.google.common.util.concurrent.ListenableFuture;
21 20 import lombok.extern.slf4j.Slf4j;
22 21 import org.apache.commons.lang3.StringUtils;
... ... @@ -67,67 +66,7 @@ public class BaseRuleService extends AbstractEntityService implements RuleServic
67 66
68 67 @Override
69 68 public RuleMetaData saveRule(RuleMetaData rule) {
70   - ruleValidator.validate(rule);
71   - if (rule.getTenantId() == null) {
72   - log.trace("Save system rule metadata with predefined id {}", systemTenantId);
73   - rule.setTenantId(systemTenantId);
74   - }
75   - if (rule.getId() != null) {
76   - RuleMetaData oldVersion = ruleDao.findById(rule.getId());
77   - if (rule.getState() == null) {
78   - rule.setState(oldVersion.getState());
79   - } else if (rule.getState() != oldVersion.getState()) {
80   - throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
81   - }
82   - } else {
83   - if (rule.getState() == null) {
84   - rule.setState(ComponentLifecycleState.SUSPENDED);
85   - } else if (rule.getState() != ComponentLifecycleState.SUSPENDED) {
86   - throw new IncorrectParameterException("Use Activate/Suspend method to control state of the rule!");
87   - }
88   - }
89   -
90   - validateFilters(rule.getFilters());
91   - if (rule.getProcessor() != null && !rule.getProcessor().isNull()) {
92   - validateComponentJson(rule.getProcessor(), ComponentType.PROCESSOR);
93   - }
94   - if (rule.getAction() != null && !rule.getAction().isNull()) {
95   - validateComponentJson(rule.getAction(), ComponentType.ACTION);
96   - }
97   - validateRuleAndPluginState(rule);
98   - return ruleDao.save(rule);
99   - }
100   -
101   - private void validateFilters(JsonNode filtersJson) {
102   - if (filtersJson == null || filtersJson.isNull()) {
103   - throw new IncorrectParameterException("Rule filters are required!");
104   - }
105   - if (!filtersJson.isArray()) {
106   - throw new IncorrectParameterException("Filters json is not an array!");
107   - }
108   - ArrayNode filtersArray = (ArrayNode) filtersJson;
109   - for (int i = 0; i < filtersArray.size(); i++) {
110   - validateComponentJson(filtersArray.get(i), ComponentType.FILTER);
111   - }
112   - }
113   -
114   - private void validateComponentJson(JsonNode json, ComponentType type) {
115   - if (json == null || json.isNull()) {
116   - throw new IncorrectParameterException(type.name() + " is required!");
117   - }
118   - String clazz = getIfValid(type.name(), json, "clazz", JsonNode::isTextual, JsonNode::asText);
119   - String name = getIfValid(type.name(), json, "name", JsonNode::isTextual, JsonNode::asText);
120   - JsonNode configuration = getIfValid(type.name(), json, "configuration", JsonNode::isObject, node -> node);
121   - ComponentDescriptor descriptor = componentDescriptorService.findByClazz(clazz);
122   - if (descriptor == null) {
123   - throw new IncorrectParameterException(type.name() + " clazz " + clazz + " is not a valid component!");
124   - }
125   - if (descriptor.getType() != type) {
126   - throw new IncorrectParameterException("Clazz " + clazz + " is not a valid " + type.name() + " component!");
127   - }
128   - if (!componentDescriptorService.validate(descriptor, configuration)) {
129   - throw new IncorrectParameterException(type.name() + " configuration is not valid!");
130   - }
  69 + throw new RuntimeException("Not supported since v1.5!");
131 70 }
132 71
133 72 private void validateRuleAndPluginState(RuleMetaData rule) {
... ...
... ... @@ -38,7 +38,7 @@ public interface TbContext {
38 38
39 39 void spawn(TbMsg msg);
40 40
41   - void ack(UUID msg);
  41 + void ack(TbMsg msg);
42 42
43 43 AttributesService getAttributesService();
44 44
... ...
... ... @@ -44,6 +44,7 @@ public class TbTransformNode implements TbNode {
44 44 try {
45 45 //TODO: refactor this to work async and fetch attributes from cache.
46 46 AttributesService service = ctx.getAttributesService();
  47 +
47 48 fetchAttributes(msg, service, config.getClientAttributeNames(), DataConstants.CLIENT_SCOPE, "cs.");
48 49 fetchAttributes(msg, service, config.getServerAttributeNames(), DataConstants.SERVER_SCOPE, "ss.");
49 50 fetchAttributes(msg, service, config.getSharedAttributeNames(), DataConstants.SHARED_SCOPE, "shared.");
... ...