Showing
6 changed files
with
32 additions
and
8 deletions
@@ -34,9 +34,11 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; | @@ -34,9 +34,11 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; | ||
34 | import org.thingsboard.server.actors.service.DefaultActorService; | 34 | import org.thingsboard.server.actors.service.DefaultActorService; |
35 | import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager; | 35 | import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager; |
36 | import org.thingsboard.server.actors.tenant.TenantActor; | 36 | import org.thingsboard.server.actors.tenant.TenantActor; |
37 | +import org.thingsboard.server.common.data.EntityType; | ||
37 | import org.thingsboard.server.common.data.Tenant; | 38 | import org.thingsboard.server.common.data.Tenant; |
38 | import org.thingsboard.server.common.data.id.TenantId; | 39 | import org.thingsboard.server.common.data.id.TenantId; |
39 | import org.thingsboard.server.common.data.page.PageDataIterable; | 40 | import org.thingsboard.server.common.data.page.PageDataIterable; |
41 | +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; | ||
40 | import org.thingsboard.server.common.msg.TbActorMsg; | 42 | import org.thingsboard.server.common.msg.TbActorMsg; |
41 | import org.thingsboard.server.common.msg.aware.TenantAwareMsg; | 43 | import org.thingsboard.server.common.msg.aware.TenantAwareMsg; |
42 | import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; | 44 | import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; |
@@ -129,7 +131,7 @@ public class AppActor extends RuleChainManagerActor { | @@ -129,7 +131,7 @@ public class AppActor extends RuleChainManagerActor { | ||
129 | 131 | ||
130 | private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { | 132 | private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { |
131 | if (SYSTEM_TENANT.equals(msg.getTenantId())) { | 133 | if (SYSTEM_TENANT.equals(msg.getTenantId())) { |
132 | - log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT); | 134 | + log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet: {}", SYSTEM_TENANT, msg); |
133 | } else { | 135 | } else { |
134 | getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); | 136 | getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); |
135 | } | 137 | } |
@@ -142,11 +144,21 @@ public class AppActor extends RuleChainManagerActor { | @@ -142,11 +144,21 @@ public class AppActor extends RuleChainManagerActor { | ||
142 | } | 144 | } |
143 | 145 | ||
144 | private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { | 146 | private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { |
145 | - ActorRef target; | 147 | + ActorRef target = null; |
146 | if (SYSTEM_TENANT.equals(msg.getTenantId())) { | 148 | if (SYSTEM_TENANT.equals(msg.getTenantId())) { |
147 | target = getEntityActorRef(msg.getEntityId()); | 149 | target = getEntityActorRef(msg.getEntityId()); |
148 | } else { | 150 | } else { |
149 | - target = getOrCreateTenantActor(msg.getTenantId()); | 151 | + if (msg.getEntityId().getEntityType() == EntityType.TENANT |
152 | + && msg.getEvent() == ComponentLifecycleEvent.DELETED) { | ||
153 | + log.debug("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); | ||
154 | + ActorRef tenantActor = tenantActors.remove(new TenantId(msg.getEntityId().getId())); | ||
155 | + if (tenantActor != null) { | ||
156 | + log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor); | ||
157 | + context().stop(tenantActor); | ||
158 | + } | ||
159 | + } else { | ||
160 | + target = getOrCreateTenantActor(msg.getTenantId()); | ||
161 | + } | ||
150 | } | 162 | } |
151 | if (target != null) { | 163 | if (target != null) { |
152 | target.tell(msg, ActorRef.noSender()); | 164 | target.tell(msg, ActorRef.noSender()); |
@@ -15,6 +15,7 @@ | @@ -15,6 +15,7 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.actors.ruleChain; | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | ||
18 | +import akka.actor.ActorInitializationException; | ||
18 | import akka.actor.OneForOneStrategy; | 19 | import akka.actor.OneForOneStrategy; |
19 | import akka.actor.SupervisorStrategy; | 20 | import akka.actor.SupervisorStrategy; |
20 | import org.thingsboard.server.actors.ActorSystemContext; | 21 | import org.thingsboard.server.actors.ActorSystemContext; |
@@ -96,7 +96,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | @@ -96,7 +96,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | ||
96 | log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size()); | 96 | log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size()); |
97 | // Creating and starting the actors; | 97 | // Creating and starting the actors; |
98 | for (RuleNode ruleNode : ruleNodeList) { | 98 | for (RuleNode ruleNode : ruleNodeList) { |
99 | - log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | 99 | + log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode); |
100 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); | 100 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
101 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); | 101 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
102 | } | 102 | } |
@@ -116,11 +116,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | @@ -116,11 +116,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | ||
116 | for (RuleNode ruleNode : ruleNodeList) { | 116 | for (RuleNode ruleNode : ruleNodeList) { |
117 | RuleNodeCtx existing = nodeActors.get(ruleNode.getId()); | 117 | RuleNodeCtx existing = nodeActors.get(ruleNode.getId()); |
118 | if (existing == null) { | 118 | if (existing == null) { |
119 | - log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | 119 | + log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode); |
120 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); | 120 | ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); |
121 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); | 121 | nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); |
122 | } else { | 122 | } else { |
123 | - log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode); | 123 | + log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode); |
124 | existing.setSelf(ruleNode); | 124 | existing.setSelf(ruleNode); |
125 | existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self); | 125 | existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self); |
126 | } | 126 | } |
@@ -184,13 +184,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | @@ -184,13 +184,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh | ||
184 | } | 184 | } |
185 | 185 | ||
186 | firstId = ruleChain.getFirstRuleNodeId(); | 186 | firstId = ruleChain.getFirstRuleNodeId(); |
187 | - firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId()); | 187 | + firstNode = nodeActors.get(firstId); |
188 | state = ComponentLifecycleState.ACTIVE; | 188 | state = ComponentLifecycleState.ACTIVE; |
189 | } | 189 | } |
190 | 190 | ||
191 | void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { | 191 | void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) { |
192 | + log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, envelope.getTbMsg().getId(), envelope.getTbMsg()); | ||
192 | checkActive(); | 193 | checkActive(); |
193 | if (firstNode != null) { | 194 | if (firstNode != null) { |
195 | + log.trace("[{}][{}] Pushing message to first rule node", entityId, firstId); | ||
194 | pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), ""); | 196 | pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), ""); |
195 | } | 197 | } |
196 | } | 198 | } |
@@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.EntityType; | @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.EntityType; | ||
37 | import org.thingsboard.server.common.data.id.DeviceId; | 37 | import org.thingsboard.server.common.data.id.DeviceId; |
38 | import org.thingsboard.server.common.data.id.RuleChainId; | 38 | import org.thingsboard.server.common.data.id.RuleChainId; |
39 | import org.thingsboard.server.common.data.id.TenantId; | 39 | import org.thingsboard.server.common.data.id.TenantId; |
40 | +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; | ||
40 | import org.thingsboard.server.common.data.rule.RuleChain; | 41 | import org.thingsboard.server.common.data.rule.RuleChain; |
41 | import org.thingsboard.server.common.msg.TbActorMsg; | 42 | import org.thingsboard.server.common.msg.TbActorMsg; |
42 | import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; | 43 | import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; |
@@ -76,6 +77,11 @@ public class TenantActor extends RuleChainManagerActor { | @@ -76,6 +77,11 @@ public class TenantActor extends RuleChainManagerActor { | ||
76 | } | 77 | } |
77 | 78 | ||
78 | @Override | 79 | @Override |
80 | + public void postStop() { | ||
81 | + log.info("[{}] Stopping tenant actor.", tenantId); | ||
82 | + } | ||
83 | + | ||
84 | + @Override | ||
79 | protected boolean process(TbActorMsg msg) { | 85 | protected boolean process(TbActorMsg msg) { |
80 | switch (msg.getMsgType()) { | 86 | switch (msg.getMsgType()) { |
81 | case CLUSTER_EVENT_MSG: | 87 | case CLUSTER_EVENT_MSG: |
@@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; | @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; | ||
32 | import org.thingsboard.server.common.data.id.TenantId; | 32 | import org.thingsboard.server.common.data.id.TenantId; |
33 | import org.thingsboard.server.common.data.page.TextPageData; | 33 | import org.thingsboard.server.common.data.page.TextPageData; |
34 | import org.thingsboard.server.common.data.page.TextPageLink; | 34 | import org.thingsboard.server.common.data.page.TextPageLink; |
35 | +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; | ||
35 | import org.thingsboard.server.dao.tenant.TenantService; | 36 | import org.thingsboard.server.dao.tenant.TenantService; |
36 | import org.thingsboard.server.service.install.InstallScripts; | 37 | import org.thingsboard.server.service.install.InstallScripts; |
37 | 38 | ||
@@ -84,6 +85,8 @@ public class TenantController extends BaseController { | @@ -84,6 +85,8 @@ public class TenantController extends BaseController { | ||
84 | try { | 85 | try { |
85 | TenantId tenantId = new TenantId(toUUID(strTenantId)); | 86 | TenantId tenantId = new TenantId(toUUID(strTenantId)); |
86 | tenantService.deleteTenant(tenantId); | 87 | tenantService.deleteTenant(tenantId); |
88 | + | ||
89 | + actorService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED); | ||
87 | } catch (Exception e) { | 90 | } catch (Exception e) { |
88 | throw handleException(e); | 91 | throw handleException(e); |
89 | } | 92 | } |
@@ -119,10 +119,10 @@ public class GatewaySessionHandler { | @@ -119,10 +119,10 @@ public class GatewaySessionHandler { | ||
119 | GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); | 119 | GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); |
120 | if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { | 120 | if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { |
121 | SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); | 121 | SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); |
122 | + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); | ||
122 | transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); | 123 | transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); |
123 | transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); | 124 | transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); |
124 | transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); | 125 | transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); |
125 | - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); | ||
126 | } | 126 | } |
127 | future.set(devices.get(deviceName)); | 127 | future.set(devices.get(deviceName)); |
128 | } | 128 | } |