Commit b8b55a98670a2d5d9301029e751e1e346a640de9

Authored by deaflynx
2 parents 9352e191 ce746b4f

Merge remote-tracking branch 'origin/feature/edge' into feature/edge

@@ -66,6 +66,7 @@ import org.thingsboard.server.dao.user.UserService; @@ -66,6 +66,7 @@ import org.thingsboard.server.dao.user.UserService;
66 import org.thingsboard.server.queue.discovery.PartitionService; 66 import org.thingsboard.server.queue.discovery.PartitionService;
67 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; 67 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
68 import org.thingsboard.server.service.component.ComponentDiscoveryService; 68 import org.thingsboard.server.service.component.ComponentDiscoveryService;
  69 +import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
69 import org.thingsboard.server.service.encoding.DataDecodingEncodingService; 70 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
70 import org.thingsboard.server.service.executors.DbCallbackExecutorService; 71 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
71 import org.thingsboard.server.service.executors.ExternalCallExecutorService; 72 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
@@ -254,15 +255,14 @@ public class ActorSystemContext { @@ -254,15 +255,14 @@ public class ActorSystemContext {
254 @Getter 255 @Getter
255 private TbCoreDeviceRpcService tbCoreDeviceRpcService; 256 private TbCoreDeviceRpcService tbCoreDeviceRpcService;
256 257
257 - @Lazy  
258 - @Autowired  
259 - @Getter  
260 - private EdgeService edgeService; 258 + @Autowired(required = false)
  259 + @Getter private EdgeService edgeService;
261 260
262 - @Lazy  
263 - @Autowired  
264 - @Getter  
265 - private EdgeEventService edgeEventService; 261 + @Autowired(required = false)
  262 + @Getter private EdgeEventService edgeEventService;
  263 +
  264 + @Autowired(required = false)
  265 + @Getter private EdgeRpcService edgeRpcService;
266 266
267 @Value("${actors.session.max_concurrent_sessions_per_device:1}") 267 @Value("${actors.session.max_concurrent_sessions_per_device:1}")
268 @Getter 268 @Getter
@@ -31,10 +31,13 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -31,10 +31,13 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
31 import org.thingsboard.server.actors.service.DefaultActorService; 31 import org.thingsboard.server.actors.service.DefaultActorService;
32 import org.thingsboard.server.common.data.EntityType; 32 import org.thingsboard.server.common.data.EntityType;
33 import org.thingsboard.server.common.data.Tenant; 33 import org.thingsboard.server.common.data.Tenant;
  34 +import org.thingsboard.server.common.data.edge.Edge;
34 import org.thingsboard.server.common.data.id.DeviceId; 35 import org.thingsboard.server.common.data.id.DeviceId;
  36 +import org.thingsboard.server.common.data.id.EdgeId;
35 import org.thingsboard.server.common.data.id.EntityId; 37 import org.thingsboard.server.common.data.id.EntityId;
36 import org.thingsboard.server.common.data.id.RuleChainId; 38 import org.thingsboard.server.common.data.id.RuleChainId;
37 import org.thingsboard.server.common.data.id.TenantId; 39 import org.thingsboard.server.common.data.id.TenantId;
  40 +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
38 import org.thingsboard.server.common.data.rule.RuleChain; 41 import org.thingsboard.server.common.data.rule.RuleChain;
39 import org.thingsboard.server.common.data.rule.RuleChainType; 42 import org.thingsboard.server.common.data.rule.RuleChainType;
40 import org.thingsboard.server.common.msg.MsgType; 43 import org.thingsboard.server.common.msg.MsgType;
@@ -47,6 +50,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; @@ -47,6 +50,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
47 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; 50 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
48 import org.thingsboard.server.common.msg.queue.RuleEngineException; 51 import org.thingsboard.server.common.msg.queue.RuleEngineException;
49 import org.thingsboard.server.common.msg.queue.ServiceType; 52 import org.thingsboard.server.common.msg.queue.ServiceType;
  53 +import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
50 54
51 import java.util.List; 55 import java.util.List;
52 import java.util.Optional; 56 import java.util.Optional;
@@ -202,7 +206,18 @@ public class TenantActor extends RuleChainManagerActor { @@ -202,7 +206,18 @@ public class TenantActor extends RuleChainManagerActor {
202 } 206 }
203 207
204 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { 208 private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
205 - if (isRuleEngineForCurrentTenant) { 209 + if (msg.getEntityId().getEntityType() == EntityType.EDGE) {
  210 + EdgeId edgeId = new EdgeId(msg.getEntityId().getId());
  211 + EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService();
  212 + if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
  213 + edgeRpcService.deleteEdge(edgeId);
  214 + } else {
  215 + Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
  216 + if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
  217 + edgeRpcService.updateEdge(edge);
  218 + }
  219 + }
  220 + } else if (isRuleEngineForCurrentTenant) {
206 TbActorRef target = getEntityActorRef(msg.getEntityId()); 221 TbActorRef target = getEntityActorRef(msg.getEntityId());
207 if (target != null) { 222 if (target != null) {
208 if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { 223 if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
@@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.id.RuleChainId;
40 import org.thingsboard.server.common.data.id.TenantId; 40 import org.thingsboard.server.common.data.id.TenantId;
41 import org.thingsboard.server.common.data.page.TextPageData; 41 import org.thingsboard.server.common.data.page.TextPageData;
42 import org.thingsboard.server.common.data.page.TextPageLink; 42 import org.thingsboard.server.common.data.page.TextPageLink;
  43 +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
43 import org.thingsboard.server.common.data.rule.RuleChain; 44 import org.thingsboard.server.common.data.rule.RuleChain;
44 import org.thingsboard.server.dao.exception.DataValidationException; 45 import org.thingsboard.server.dao.exception.DataValidationException;
45 import org.thingsboard.server.dao.exception.IncorrectParameterException; 46 import org.thingsboard.server.dao.exception.IncorrectParameterException;
@@ -101,6 +102,9 @@ public class EdgeController extends BaseController { @@ -101,6 +102,9 @@ public class EdgeController extends BaseController {
101 edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId()); 102 edgeService.assignDefaultRuleChainsToEdge(tenantId, savedEdge.getId());
102 } 103 }
103 104
  105 + tbClusterService.onEntityStateChange(savedEdge.getTenantId(), savedEdge.getId(),
  106 + created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
  107 +
104 logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null); 108 logEntityAction(savedEdge.getId(), savedEdge, null, created ? ActionType.ADDED : ActionType.UPDATED, null);
105 return savedEdge; 109 return savedEdge;
106 } catch (Exception e) { 110 } catch (Exception e) {
@@ -120,6 +124,9 @@ public class EdgeController extends BaseController { @@ -120,6 +124,9 @@ public class EdgeController extends BaseController {
120 Edge edge = checkEdgeId(edgeId, Operation.DELETE); 124 Edge edge = checkEdgeId(edgeId, Operation.DELETE);
121 edgeService.deleteEdge(getTenantId(), edgeId); 125 edgeService.deleteEdge(getTenantId(), edgeId);
122 126
  127 + tbClusterService.onEntityStateChange(getTenantId(), edgeId,
  128 + ComponentLifecycleEvent.DELETED);
  129 +
123 logEntityAction(edgeId, edge, 130 logEntityAction(edgeId, edge,
124 null, 131 null,
125 ActionType.DELETED, null, strEdgeId); 132 ActionType.DELETED, null, strEdgeId);
@@ -284,6 +291,8 @@ public class EdgeController extends BaseController { @@ -284,6 +291,8 @@ public class EdgeController extends BaseController {
284 291
285 Edge updatedEdge = edgeNotificationService.setEdgeRootRuleChain(getTenantId(), edge, ruleChainId); 292 Edge updatedEdge = edgeNotificationService.setEdgeRootRuleChain(getTenantId(), edge, ruleChainId);
286 293
  294 + tbClusterService.onEntityStateChange(updatedEdge.getTenantId(), updatedEdge.getId(), ComponentLifecycleEvent.UPDATED);
  295 +
287 logEntityAction(updatedEdge.getId(), updatedEdge, null, ActionType.UPDATED, null); 296 logEntityAction(updatedEdge.getId(), updatedEdge, null, ActionType.UPDATED, null);
288 297
289 return updatedEdge; 298 return updatedEdge;
@@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value; @@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value;
27 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 27 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28 import org.springframework.stereotype.Service; 28 import org.springframework.stereotype.Service;
29 import org.thingsboard.server.common.data.DataConstants; 29 import org.thingsboard.server.common.data.DataConstants;
  30 +import org.thingsboard.server.common.data.edge.Edge;
30 import org.thingsboard.server.common.data.id.EdgeId; 31 import org.thingsboard.server.common.data.id.EdgeId;
31 import org.thingsboard.server.common.data.id.TenantId; 32 import org.thingsboard.server.common.data.id.TenantId;
32 import org.thingsboard.server.common.data.kv.BasicTsKvEntry; 33 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@@ -52,7 +53,7 @@ import java.util.concurrent.Executors; @@ -52,7 +53,7 @@ import java.util.concurrent.Executors;
52 @Service 53 @Service
53 @Slf4j 54 @Slf4j
54 @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true") 55 @ConditionalOnProperty(prefix = "edges.rpc", value = "enabled", havingValue = "true")
55 -public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { 56 +public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
56 57
57 private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>(); 58 private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
58 private static final ObjectMapper mapper = new ObjectMapper(); 59 private static final ObjectMapper mapper = new ObjectMapper();
@@ -117,6 +118,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { @@ -117,6 +118,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
117 return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream(); 118 return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream();
118 } 119 }
119 120
  121 + @Override
  122 + public void updateEdge(Edge edge) {
  123 + EdgeGrpcSession session = sessions.get(edge.getId());
  124 + if (session != null && session.isConnected()) {
  125 + session.onConfigurationUpdate(edge);
  126 + }
  127 + }
  128 +
  129 + @Override
  130 + public void deleteEdge(EdgeId edgeId) {
  131 + EdgeGrpcSession session = sessions.get(edgeId);
  132 + if (session != null && session.isConnected()) {
  133 + session.close();
  134 + sessions.remove(edgeId);
  135 + }
  136 + }
  137 +
120 private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) { 138 private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
121 sessions.put(edgeId, edgeGrpcSession); 139 sessions.put(edgeId, edgeGrpcSession);
122 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true); 140 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);
@@ -192,6 +192,20 @@ public final class EdgeGrpcSession implements Closeable { @@ -192,6 +192,20 @@ public final class EdgeGrpcSession implements Closeable {
192 }; 192 };
193 } 193 }
194 194
  195 + void onConfigurationUpdate(Edge edge) {
  196 + try {
  197 + this.edge = edge;
  198 + // TODO: voba - push edge configuration update to edge
  199 +// outputStream.onNext(org.thingsboard.server.gen.integration.ResponseMsg.newBuilder()
  200 +// .setIntegrationUpdateMsg(IntegrationUpdateMsg.newBuilder()
  201 +// .setConfiguration(constructIntegrationConfigProto(configuration, defaultConverterProto, downLinkConverterProto))
  202 +// .build())
  203 +// .build());
  204 + } catch (Exception e) {
  205 + log.error("Failed to construct proto objects!", e);
  206 + }
  207 + }
  208 +
195 void processHandleMessages() throws ExecutionException, InterruptedException { 209 void processHandleMessages() throws ExecutionException, InterruptedException {
196 Long queueStartTs = getQueueStartTs().get(); 210 Long queueStartTs = getQueueStartTs().get();
197 TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true); 211 TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), queueStartTs, null, true);
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc;
  17 +
  18 +import org.thingsboard.server.common.data.edge.Edge;
  19 +import org.thingsboard.server.common.data.id.EdgeId;
  20 +
  21 +public interface EdgeRpcService {
  22 +
  23 + void updateEdge(Edge edge);
  24 +
  25 + void deleteEdge(EdgeId edgeId);
  26 +}