Commit ad5e8c0b96c4129b42ad92f9b69ffd0123446346

Authored by Volodymyr Babak
1 parent a770a59b

Added proto update messages and base logic

... ... @@ -54,6 +54,7 @@ import org.thingsboard.server.dao.cassandra.CassandraCluster;
54 54 import org.thingsboard.server.dao.customer.CustomerService;
55 55 import org.thingsboard.server.dao.dashboard.DashboardService;
56 56 import org.thingsboard.server.dao.device.DeviceService;
  57 +import org.thingsboard.server.dao.edge.EdgeService;
57 58 import org.thingsboard.server.dao.entityview.EntityViewService;
58 59 import org.thingsboard.server.dao.event.EventService;
59 60 import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
... ... @@ -245,6 +246,11 @@ public class ActorSystemContext {
245 246 @Getter
246 247 private RuleChainTransactionService ruleChainTransactionService;
247 248
  249 + @Lazy
  250 + @Autowired
  251 + @Getter
  252 + private EdgeService edgeService;
  253 +
248 254 @Value("${cluster.partition_id}")
249 255 @Getter
250 256 private long queuePartitionId;
... ...
... ... @@ -28,7 +28,9 @@ import org.thingsboard.server.actors.ActorSystemContext;
28 28 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
29 29 import org.thingsboard.server.actors.service.DefaultActorService;
30 30 import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
  31 +import org.thingsboard.server.common.data.DataConstants;
31 32 import org.thingsboard.server.common.data.EntityType;
  33 +import org.thingsboard.server.common.data.ShortEdgeInfo;
32 34 import org.thingsboard.server.common.data.id.EntityId;
33 35 import org.thingsboard.server.common.data.id.RuleChainId;
34 36 import org.thingsboard.server.common.data.id.RuleNodeId;
... ... @@ -43,6 +45,7 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
43 45 import org.thingsboard.server.common.msg.cluster.ServerAddress;
44 46 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
45 47 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
  48 +import org.thingsboard.server.dao.edge.EdgeService;
46 49 import org.thingsboard.server.dao.rule.RuleChainService;
47 50
48 51 import java.util.ArrayList;
... ... @@ -65,6 +68,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
65 68 private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
66 69 private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
67 70 private final RuleChainService service;
  71 + private final EdgeService edgeService;
68 72
69 73 private RuleNodeId firstId;
70 74 private RuleNodeCtx firstNode;
... ... @@ -79,6 +83,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
79 83 this.nodeActors = new HashMap<>();
80 84 this.nodeRoutes = new HashMap<>();
81 85 this.service = systemContext.getRuleChainService();
  86 + this.edgeService = systemContext.getEdgeService();
82 87 this.ruleChainName = ruleChainId.toString();
83 88 }
84 89
... ... @@ -326,6 +331,19 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
326 331 if (nodeCtx != null) {
327 332 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self);
328 333 }
  334 + pushUpdatesToEdges(msg);
  335 + }
  336 +
  337 + private void pushUpdatesToEdges(TbMsg msg) {
  338 + switch (msg.getType()) {
  339 + case DataConstants.ENTITY_CREATED:
  340 + case DataConstants.ENTITY_UPDATED:
  341 + case DataConstants.ENTITY_DELETED:
  342 + case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
  343 + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
  344 + edgeService.pushEventToEdge(tenantId, msg);
  345 + }
  346 +
329 347 }
330 348
331 349 private TbMsg enrichWithRuleChainId(TbMsg tbMsg) {
... ...
... ... @@ -610,6 +610,12 @@ public abstract class BaseController {
610 610 case ALARM_CLEAR:
611 611 msgType = DataConstants.ALARM_CLEAR;
612 612 break;
  613 + case ASSIGNED_TO_EDGE:
  614 + msgType = DataConstants.ENTITY_ASSIGNED_TO_EDGE;
  615 + break;
  616 + case UNASSIGNED_FROM_EDGE:
  617 + msgType = DataConstants.ENTITY_UNASSIGNED_FROM_EDGE;
  618 + break;
613 619 }
614 620 if (!StringUtils.isEmpty(msgType)) {
615 621 try {
... ... @@ -629,6 +635,12 @@ public abstract class BaseController {
629 635 String strCustomerName = extractParameter(String.class, 2, additionalInfo);
630 636 metaData.putValue("unassignedCustomerId", strCustomerId);
631 637 metaData.putValue("unassignedCustomerName", strCustomerName);
  638 + } if (actionType == ActionType.ASSIGNED_TO_EDGE) {
  639 + String strEdgeId = extractParameter(String.class, 1, additionalInfo);
  640 + metaData.putValue("assignedEdgeId", strEdgeId);
  641 + } else if (actionType == ActionType.UNASSIGNED_FROM_EDGE) {
  642 + String strEdgeId = extractParameter(String.class, 1, additionalInfo);
  643 + metaData.putValue("unassignedEdgeId", strEdgeId);
632 644 }
633 645 ObjectNode entityNode;
634 646 if (entity != null) {
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.edge;
2 17
3 18 import lombok.Data;
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.service.edge.rpc;
17 17
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
18 19 import com.google.common.io.Resources;
19 20 import io.grpc.Server;
20 21 import io.grpc.ServerBuilder;
... ... @@ -24,7 +25,18 @@ import org.springframework.beans.factory.annotation.Autowired;
24 25 import org.springframework.beans.factory.annotation.Value;
25 26 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
26 27 import org.springframework.stereotype.Service;
  28 +import org.thingsboard.server.common.data.Event;
  29 +import org.thingsboard.server.common.data.edge.Edge;
  30 +import org.thingsboard.server.common.data.id.DeviceId;
27 31 import org.thingsboard.server.common.data.id.EdgeId;
  32 +import org.thingsboard.server.common.data.id.TenantId;
  33 +import org.thingsboard.server.common.data.page.TimePageData;
  34 +import org.thingsboard.server.common.data.page.TimePageLink;
  35 +import org.thingsboard.server.dao.asset.AssetService;
  36 +import org.thingsboard.server.dao.attributes.AttributesService;
  37 +import org.thingsboard.server.dao.device.DeviceService;
  38 +import org.thingsboard.server.dao.edge.EdgeService;
  39 +import org.thingsboard.server.dao.event.EventService;
28 40 import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
29 41 import org.thingsboard.server.gen.edge.RequestMsg;
30 42 import org.thingsboard.server.gen.edge.ResponseMsg;
... ... @@ -36,6 +48,9 @@ import java.io.File;
36 48 import java.io.IOException;
37 49 import java.util.Map;
38 50 import java.util.concurrent.ConcurrentHashMap;
  51 +import java.util.concurrent.ExecutorService;
  52 +import java.util.concurrent.Executors;
  53 +import java.util.concurrent.TimeUnit;
39 54
40 55 @Service
41 56 @Slf4j
... ... @@ -43,6 +58,7 @@ import java.util.concurrent.ConcurrentHashMap;
43 58 public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
44 59
45 60 private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
  61 + private static final ObjectMapper objectMapper = new ObjectMapper();
46 62
47 63 @Value("${edges.rpc.port}")
48 64 private int rpcPort;
... ... @@ -56,8 +72,22 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
56 72 @Autowired
57 73 private EdgeContextComponent ctx;
58 74
  75 + @Autowired
  76 + private EdgeService edgeService;
  77 +
  78 + @Autowired
  79 + private AssetService assetService;
  80 +
  81 + @Autowired
  82 + private DeviceService deviceService;
  83 +
  84 + @Autowired
  85 + private AttributesService attributesService;
  86 +
59 87 private Server server;
60 88
  89 + private ExecutorService executor;
  90 +
61 91 @PostConstruct
62 92 public void init() {
63 93 log.info("Initializing Edge RPC service!");
... ... @@ -81,9 +111,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
81 111 throw new RuntimeException("Failed to start Edge RPC server!");
82 112 }
83 113 log.info("Edge RPC service initialized!");
  114 + executor = Executors.newSingleThreadExecutor();
  115 + processHandleMessages();
84 116 }
85 117
86   -
87 118 @PreDestroy
88 119 public void destroy() {
89 120 if (server != null) {
... ... @@ -92,14 +123,28 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
92 123 }
93 124
94 125 @Override
95   - public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> responseObserver) {
96   - return new EdgeGrpcSession(ctx, responseObserver, this::onEdgeConnect, this::onEdgeDisconnect).getInputStream();
  126 + public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
  127 + return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, edgeService, assetService, deviceService, attributesService, objectMapper).getInputStream();
97 128 }
98 129
99 130 private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
100 131 sessions.put(edgeId, edgeGrpcSession);
101 132 }
102 133
  134 + private void processHandleMessages() {
  135 + executor.submit(() -> {
  136 + while (!Thread.interrupted()) {
  137 + try {
  138 + for (EdgeGrpcSession session : sessions.values()) {
  139 + session.processHandleMessages();
  140 + }
  141 + } catch (Exception e) {
  142 + log.warn("Failed to process messages handling!", e);
  143 + }
  144 + }
  145 + });
  146 + }
  147 +
103 148 private void onEdgeDisconnect(EdgeId edgeId) {
104 149 sessions.remove(edgeId);
105 150 }
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.edge.rpc;
2 17
  18 +import com.datastax.driver.core.utils.UUIDs;
3 19 import com.fasterxml.jackson.core.JsonProcessingException;
  20 +import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import com.google.common.util.concurrent.Futures;
  22 +import com.google.common.util.concurrent.ListenableFuture;
4 23 import io.grpc.stub.StreamObserver;
5 24 import lombok.Data;
6 25 import lombok.extern.slf4j.Slf4j;
  26 +import org.thingsboard.server.common.data.Dashboard;
  27 +import org.thingsboard.server.common.data.DataConstants;
  28 +import org.thingsboard.server.common.data.Device;
  29 +import org.thingsboard.server.common.data.EntityType;
  30 +import org.thingsboard.server.common.data.EntityView;
  31 +import org.thingsboard.server.common.data.Event;
  32 +import org.thingsboard.server.common.data.asset.Asset;
7 33 import org.thingsboard.server.common.data.edge.Edge;
  34 +import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
  35 +import org.thingsboard.server.common.data.id.AssetId;
  36 +import org.thingsboard.server.common.data.id.DeviceId;
8 37 import org.thingsboard.server.common.data.id.EdgeId;
9 38 import org.thingsboard.server.common.data.id.TenantId;
  39 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  40 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  41 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  42 +import org.thingsboard.server.common.data.page.TimePageData;
  43 +import org.thingsboard.server.common.data.page.TimePageLink;
  44 +import org.thingsboard.server.common.data.rule.RuleChain;
  45 +import org.thingsboard.server.dao.asset.AssetService;
  46 +import org.thingsboard.server.dao.attributes.AttributesService;
  47 +import org.thingsboard.server.dao.device.DeviceService;
  48 +import org.thingsboard.server.dao.edge.EdgeService;
  49 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  50 +import org.thingsboard.server.gen.edge.AssetUpdateMsg;
10 51 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
11 52 import org.thingsboard.server.gen.edge.ConnectResponseCode;
12 53 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
13   -import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
  54 +import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
  55 +import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  56 +import org.thingsboard.server.gen.edge.EdgeConfiguration;
  57 +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
14 58 import org.thingsboard.server.gen.edge.RequestMsg;
15 59 import org.thingsboard.server.gen.edge.RequestMsgType;
16 60 import org.thingsboard.server.gen.edge.ResponseMsg;
  61 +import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
  62 +import org.thingsboard.server.gen.edge.UpdateMsgType;
17 63 import org.thingsboard.server.gen.edge.UplinkMsg;
18 64 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
19 65 import org.thingsboard.server.service.edge.EdgeContextComponent;
20 66
  67 +import java.util.Arrays;
  68 +import java.util.Collections;
  69 +import java.util.List;
21 70 import java.util.Optional;
22 71 import java.util.UUID;
  72 +import java.util.concurrent.ExecutionException;
23 73 import java.util.function.BiConsumer;
24 74 import java.util.function.Consumer;
25 75
... ... @@ -30,6 +80,7 @@ public final class EdgeGrpcSession implements Cloneable {
30 80 private final UUID sessionId;
31 81 private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
32 82 private final Consumer<EdgeId> sessionCloseListener;
  83 + private final ObjectMapper objectMapper;
33 84
34 85 private EdgeContextComponent ctx;
35 86 private Edge edge;
... ... @@ -37,14 +88,24 @@ public final class EdgeGrpcSession implements Cloneable {
37 88 private StreamObserver<ResponseMsg> outputStream;
38 89 private boolean connected;
39 90
40   - EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream
41   - , BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener
42   - , Consumer<EdgeId> sessionCloseListener) {
  91 + private EdgeService edgeService;
  92 + private AssetService assetService;
  93 + private DeviceService deviceService;
  94 + private AttributesService attributesService;
  95 +
  96 + EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
  97 + BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, Consumer<EdgeId> sessionCloseListener,
  98 + EdgeService edgeService, AssetService assetService, DeviceService deviceService, AttributesService attributesService, ObjectMapper objectMapper) {
43 99 this.sessionId = UUID.randomUUID();
44 100 this.ctx = ctx;
45 101 this.outputStream = outputStream;
46 102 this.sessionOpenListener = sessionOpenListener;
47 103 this.sessionCloseListener = sessionCloseListener;
  104 + this.objectMapper = objectMapper;
  105 + this.edgeService = edgeService;
  106 + this.assetService = assetService;
  107 + this.deviceService = deviceService;
  108 + this.attributesService = attributesService;
48 109 initInputStream();
49 110 }
50 111
... ... @@ -83,6 +144,193 @@ public final class EdgeGrpcSession implements Cloneable {
83 144 };
84 145 }
85 146
  147 + void processHandleMessages() throws ExecutionException, InterruptedException {
  148 + Long queueStartTs = getQueueStartTs().get();
  149 + // TODO: this 100 value must be chagned properly
  150 + TimePageLink pageLink = new TimePageLink(30, queueStartTs + 1000);
  151 + TimePageData<Event> pageData;
  152 + UUID ifOffset = null;
  153 + do {
  154 + pageData = edgeService.findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
  155 + if (!pageData.getData().isEmpty()) {
  156 + for (Event event : pageData.getData()) {
  157 + EdgeQueueEntry entry;
  158 + try {
  159 + entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
  160 + UpdateMsgType msgType = getResponseMsgType(entry.getType());
  161 + switch (entry.getEntityType()) {
  162 + case DEVICE:
  163 + Device device = objectMapper.readValue(entry.getData(), Device.class);
  164 + onDeviceUpdated(msgType, device);
  165 + break;
  166 + case ASSET:
  167 + Asset asset = objectMapper.readValue(entry.getData(), Asset.class);
  168 + onAssetUpdated(msgType, asset);
  169 + break;
  170 + case ENTITY_VIEW:
  171 + EntityView entityView = objectMapper.readValue(entry.getData(), EntityView.class);
  172 + onEntityViewUpdated(msgType, entityView);
  173 + break;
  174 + case DASHBOARD:
  175 + Dashboard dashboard = objectMapper.readValue(entry.getData(), Dashboard.class);
  176 + onDashboardUpdated(msgType, dashboard);
  177 + break;
  178 + case RULE_CHAIN:
  179 + RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class);
  180 + onRuleChainUpdated(msgType, ruleChain);
  181 + break;
  182 + }
  183 + } catch (Exception e) {
  184 + log.error("Exception during processing records from queue", e);
  185 + }
  186 + ifOffset = event.getUuidId();
  187 + }
  188 + }
  189 + if (pageData.hasNext()) {
  190 + pageLink = pageData.getNextPageLink();
  191 + }
  192 + } while (pageData.hasNext());
  193 +
  194 + if (ifOffset != null) {
  195 + Long newStartTs = UUIDs.unixTimestamp(ifOffset);
  196 + updateQueueStartTs(newStartTs);
  197 + }
  198 + try {
  199 + Thread.sleep(1000);
  200 + } catch (InterruptedException e) {
  201 + log.error("Error during sleep", e);
  202 + }
  203 + }
  204 +
  205 + private void updateQueueStartTs(Long newStartTs) {
  206 + List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry("queueStartTs", newStartTs), System.currentTimeMillis()));
  207 + attributesService.save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
  208 + }
  209 +
  210 + private ListenableFuture<Long> getQueueStartTs() {
  211 + ListenableFuture<Optional<AttributeKvEntry>> future =
  212 + attributesService.find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
  213 + return Futures.transform(future, attributeKvEntryOpt -> {
  214 + if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
  215 + AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
  216 + return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
  217 + } else {
  218 + return 0L;
  219 + }
  220 + } );
  221 + }
  222 +
  223 + private void onDeviceUpdated(UpdateMsgType msgType, Device device) {
  224 + outputStream.onNext(ResponseMsg.newBuilder()
  225 + .setDeviceUpdateMsg(constructDeviceUpdatedMsg(msgType, device))
  226 + .build());
  227 + }
  228 +
  229 + private void onAssetUpdated(UpdateMsgType msgType, Asset asset) {
  230 + outputStream.onNext(ResponseMsg.newBuilder()
  231 + .setAssetUpdateMsg(constructAssetUpdatedMsg(msgType, asset))
  232 + .build());
  233 + }
  234 +
  235 + private void onEntityViewUpdated(UpdateMsgType msgType, EntityView entityView) {
  236 + outputStream.onNext(ResponseMsg.newBuilder()
  237 + .setEntityViewUpdateMsg(constructEntityViewUpdatedMsg(msgType, entityView))
  238 + .build());
  239 + }
  240 +
  241 + private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) {
  242 + outputStream.onNext(ResponseMsg.newBuilder()
  243 + .setRuleChainUpdateMsg(constructRuleChainUpdatedMsg(msgType, ruleChain))
  244 + .build());
  245 + }
  246 +
  247 + private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) {
  248 + outputStream.onNext(ResponseMsg.newBuilder()
  249 + .setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard))
  250 + .build());
  251 + }
  252 +
  253 + private UpdateMsgType getResponseMsgType(String msgType) {
  254 + switch (msgType) {
  255 + case DataConstants.ENTITY_UPDATED:
  256 + return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
  257 + case DataConstants.ENTITY_CREATED:
  258 + case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
  259 + return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
  260 + case DataConstants.ENTITY_DELETED:
  261 + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
  262 + return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
  263 + default:
  264 + throw new RuntimeException("Unsupported mstType [" + msgType + "]");
  265 + }
  266 + }
  267 +
  268 + private RuleChainUpdateMsg constructRuleChainUpdatedMsg(UpdateMsgType msgType, RuleChain ruleChain) {
  269 + RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder()
  270 + .setMsgType(msgType)
  271 + .setIdMSB(ruleChain.getId().getId().getMostSignificantBits())
  272 + .setIdLSB(ruleChain.getId().getId().getLeastSignificantBits())
  273 + .setName(ruleChain.getName())
  274 + .setRoot(ruleChain.isRoot())
  275 + .setDebugMode(ruleChain.isDebugMode())
  276 + .setConfiguration(JacksonUtil.toString(ruleChain.getConfiguration()));
  277 + if (ruleChain.getFirstRuleNodeId() != null) {
  278 + builder.setFirstRuleNodeIdMSB(ruleChain.getFirstRuleNodeId().getId().getMostSignificantBits())
  279 + .setFirstRuleNodeIdLSB(ruleChain.getFirstRuleNodeId().getId().getLeastSignificantBits());
  280 + }
  281 + return builder.build();
  282 + }
  283 +
  284 + private DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) {
  285 + DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder()
  286 + .setMsgType(msgType)
  287 + .setIdMSB(dashboard.getId().getId().getMostSignificantBits())
  288 + .setIdLSB(dashboard.getId().getId().getLeastSignificantBits())
  289 + .setName(dashboard.getName());
  290 + return builder.build();
  291 + }
  292 +
  293 + private DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
  294 + DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
  295 + .setMsgType(msgType)
  296 + .setName(device.getName())
  297 + .setType(device.getName());
  298 + return builder.build();
  299 + }
  300 +
  301 + private AssetUpdateMsg constructAssetUpdatedMsg(UpdateMsgType msgType, Asset asset) {
  302 + AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder()
  303 + .setMsgType(msgType)
  304 + .setName(asset.getName())
  305 + .setType(asset.getName());
  306 + return builder.build();
  307 + }
  308 +
  309 + private EntityViewUpdateMsg constructEntityViewUpdatedMsg(UpdateMsgType msgType, EntityView entityView) {
  310 + String relatedName;
  311 + String relatedType;
  312 + org.thingsboard.server.gen.edge.EntityType relatedEntityType;
  313 + if (entityView.getEntityId().getEntityType().equals(EntityType.DEVICE)) {
  314 + Device device = deviceService.findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId()));
  315 + relatedName = device.getName();
  316 + relatedType = device.getType();
  317 + relatedEntityType = org.thingsboard.server.gen.edge.EntityType.DEVICE;
  318 + } else {
  319 + Asset asset = assetService.findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId()));
  320 + relatedName = asset.getName();
  321 + relatedType = asset.getType();
  322 + relatedEntityType = org.thingsboard.server.gen.edge.EntityType.ASSET;
  323 + }
  324 + EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder()
  325 + .setMsgType(msgType)
  326 + .setName(entityView.getName())
  327 + .setType(entityView.getName())
  328 + .setRelatedName(relatedName)
  329 + .setRelatedType(relatedType)
  330 + .setRelatedEntityType(relatedEntityType);
  331 + return builder.build();
  332 + }
  333 +
86 334 private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) {
87 335 return null;
88 336 }
... ... @@ -103,23 +351,23 @@ public final class EdgeGrpcSession implements Cloneable {
103 351 return ConnectResponseMsg.newBuilder()
104 352 .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
105 353 .setErrorMsg("Failed to validate the edge!")
106   - .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
  354 + .setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
107 355 } catch (Exception e) {
108 356 log.error("[{}] Failed to process edge connection!", request.getEdgeRoutingKey(), e);
109 357 return ConnectResponseMsg.newBuilder()
110 358 .setResponseCode(ConnectResponseCode.SERVER_UNAVAILABLE)
111 359 .setErrorMsg("Failed to process edge connection!")
112   - .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
  360 + .setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
113 361 }
114 362 }
115 363 return ConnectResponseMsg.newBuilder()
116 364 .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
117 365 .setErrorMsg("Failed to find the edge! Routing key: " + request.getEdgeRoutingKey())
118   - .setConfiguration(EdgeConfigurationProto.getDefaultInstance()).build();
  366 + .setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
119 367 }
120 368
121   - private EdgeConfigurationProto constructEdgeConfigProto(Edge edge) throws JsonProcessingException {
122   - return EdgeConfigurationProto.newBuilder()
  369 + private EdgeConfiguration constructEdgeConfigProto(Edge edge) throws JsonProcessingException {
  370 + return EdgeConfiguration.newBuilder()
123 371 .setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
124 372 .setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
125 373 .setName(edge.getName())
... ...
... ... @@ -15,15 +15,22 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
18 19 import com.google.common.util.concurrent.ListenableFuture;
19 20 import org.thingsboard.server.common.data.EntitySubtype;
  21 +import org.thingsboard.server.common.data.Event;
  22 +import org.thingsboard.server.common.data.Tenant;
20 23 import org.thingsboard.server.common.data.edge.Edge;
21 24 import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
22 25 import org.thingsboard.server.common.data.id.CustomerId;
23 26 import org.thingsboard.server.common.data.id.EdgeId;
  27 +import org.thingsboard.server.common.data.id.EntityId;
24 28 import org.thingsboard.server.common.data.id.TenantId;
25 29 import org.thingsboard.server.common.data.page.TextPageData;
26 30 import org.thingsboard.server.common.data.page.TextPageLink;
  31 +import org.thingsboard.server.common.data.page.TimePageData;
  32 +import org.thingsboard.server.common.data.page.TimePageLink;
  33 +import org.thingsboard.server.common.msg.TbMsg;
27 34
28 35 import java.util.List;
29 36 import java.util.Optional;
... ... @@ -66,6 +73,9 @@ public interface EdgeService {
66 73
67 74 ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId);
68 75
  76 + void pushEventToEdge(TenantId tenantId, TbMsg tbMsg);
  77 +
  78 + TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink);
69 79 }
70 80
71 81
... ...
... ... @@ -56,6 +56,8 @@ public class DataConstants {
56 56 public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED";
57 57 public static final String ALARM_ACK = "ALARM_ACK";
58 58 public static final String ALARM_CLEAR = "ALARM_CLEAR";
  59 + public static final String ENTITY_ASSIGNED_TO_EDGE = "ENTITY_ASSIGNED_TO_EDGE";
  60 + public static final String ENTITY_UNASSIGNED_FROM_EDGE = "ENTITY_UNASSIGNED_FROM_EDGE";
59 61
60 62 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
61 63
... ... @@ -63,4 +65,6 @@ public class DataConstants {
63 65 public static final String SECRET_KEY_FIELD_NAME = "secretKey";
64 66 public static final String DURATION_MS_FIELD_NAME = "durationMs";
65 67
  68 + public static final String EDGE_QUEUE_EVENT_TYPE = "EDGE_QUEUE";
  69 +
66 70 }
... ...
  1 +package org.thingsboard.server.common.data.edge;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.EntityType;
  5 +
  6 +@Data
  7 +public class EdgeQueueEntry {
  8 + private String type;
  9 + private EntityType entityType;
  10 + private String data;
  11 +}
... ...
... ... @@ -24,15 +24,20 @@ import lombok.extern.slf4j.Slf4j;
24 24 import org.springframework.beans.factory.annotation.Value;
25 25 import org.springframework.stereotype.Service;
26 26 import org.thingsboard.edge.exception.EdgeConnectionException;
27   -import org.thingsboard.server.gen.edge.CloudDownlinkDataProto;
  27 +import org.thingsboard.server.gen.edge.AssetUpdateMsg;
28 28 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
29 29 import org.thingsboard.server.gen.edge.ConnectResponseCode;
30 30 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
31   -import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
  31 +import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
  32 +import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  33 +import org.thingsboard.server.gen.edge.DownlinkMsg;
  34 +import org.thingsboard.server.gen.edge.EdgeConfiguration;
32 35 import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
  36 +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
33 37 import org.thingsboard.server.gen.edge.RequestMsg;
34 38 import org.thingsboard.server.gen.edge.RequestMsgType;
35 39 import org.thingsboard.server.gen.edge.ResponseMsg;
  40 +import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
36 41 import org.thingsboard.server.gen.edge.UplinkMsg;
37 42 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
38 43
... ... @@ -65,8 +70,13 @@ public class EdgeGrpcClient implements EdgeRpcClient {
65 70 public void connect(String edgeKey,
66 71 String edgeSecret,
67 72 Consumer<UplinkResponseMsg> onUplinkResponse,
68   - Consumer<EdgeConfigurationProto> onEdgeUpdate,
69   - Consumer<CloudDownlinkDataProto> onDownlink,
  73 + Consumer<EdgeConfiguration> onEdgeUpdate,
  74 + Consumer<DeviceUpdateMsg> onDeviceUpdate,
  75 + Consumer<AssetUpdateMsg> onAssetUpdate,
  76 + Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
  77 + Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
  78 + Consumer<DashboardUpdateMsg> onDashboardUpdate,
  79 + Consumer<DownlinkMsg> onDownlink,
70 80 Consumer<Exception> onError) {
71 81 NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext();
72 82 if (sslEnabled) {
... ... @@ -80,7 +90,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
80 90 channel = builder.build();
81 91 EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
82 92 log.info("[{}] Sending a connect request to the TB!", edgeKey);
83   - this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDownlink, onError));
  93 + this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onDashboardUpdate, onDownlink, onError));
84 94 this.inputStream.onNext(RequestMsg.newBuilder()
85 95 .setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE)
86 96 .setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build())
... ... @@ -103,7 +113,16 @@ public class EdgeGrpcClient implements EdgeRpcClient {
103 113 .build());
104 114 }
105 115
106   - private StreamObserver<ResponseMsg> initOutputStream(String edgeKey, Consumer<UplinkResponseMsg> onUplinkResponse, Consumer<EdgeConfigurationProto> onEdgeUpdate, Consumer<CloudDownlinkDataProto> onDownlink, Consumer<Exception> onError) {
  116 + private StreamObserver<ResponseMsg> initOutputStream(String edgeKey,
  117 + Consumer<UplinkResponseMsg> onUplinkResponse,
  118 + Consumer<EdgeConfiguration> onEdgeUpdate,
  119 + Consumer<DeviceUpdateMsg> onDeviceUpdate,
  120 + Consumer<AssetUpdateMsg> onAssetUpdate,
  121 + Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
  122 + Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
  123 + Consumer<DashboardUpdateMsg> onDashboardUpdate,
  124 + Consumer<DownlinkMsg> onDownlink,
  125 + Consumer<Exception> onError) {
107 126 return new StreamObserver<ResponseMsg>() {
108 127 @Override
109 128 public void onNext(ResponseMsg responseMsg) {
... ... @@ -119,9 +138,24 @@ public class EdgeGrpcClient implements EdgeRpcClient {
119 138 } else if (responseMsg.hasUplinkResponseMsg()) {
120 139 log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg());
121 140 onUplinkResponse.accept(responseMsg.getUplinkResponseMsg());
  141 + } else if (responseMsg.hasDeviceUpdateMsg()) {
  142 + log.debug("[{}] Device update message received {}", edgeKey, responseMsg.getDeviceUpdateMsg());
  143 + onDeviceUpdate.accept(responseMsg.getDeviceUpdateMsg());
  144 + } else if (responseMsg.hasAssetUpdateMsg()) {
  145 + log.debug("[{}] Asset update message received {}", edgeKey, responseMsg.getAssetUpdateMsg());
  146 + onAssetUpdate.accept(responseMsg.getAssetUpdateMsg());
  147 + } else if (responseMsg.hasEntityViewUpdateMsg()) {
  148 + log.debug("[{}] EntityView update message received {}", edgeKey, responseMsg.getEntityViewUpdateMsg());
  149 + onEntityViewUpdate.accept(responseMsg.getEntityViewUpdateMsg());
  150 + } else if (responseMsg.hasRuleChainUpdateMsg()) {
  151 + log.debug("[{}] Rule Chain udpate message received {}", edgeKey, responseMsg.getRuleChainUpdateMsg());
  152 + onRuleChainUpdate.accept(responseMsg.getRuleChainUpdateMsg());
  153 + } else if (responseMsg.hasDashboardUpdateMsg()) {
  154 + log.debug("[{}] Dashboard message received {}", edgeKey, responseMsg.getDashboardUpdateMsg());
  155 + onDashboardUpdate.accept(responseMsg.getDashboardUpdateMsg());
122 156 } else if (responseMsg.hasDownlinkMsg()) {
123   - log.debug("[{}] Downlink message received for device {}", edgeKey, responseMsg.getDownlinkMsg().getCloudData().getDeviceName());
124   - onDownlink.accept(responseMsg.getDownlinkMsg().getCloudData());
  157 + log.debug("[{}] Downlink message received for rule chain {}", edgeKey, responseMsg.getDownlinkMsg());
  158 + onDownlink.accept(responseMsg.getDownlinkMsg());
125 159 }
126 160 }
127 161
... ...
... ... @@ -15,8 +15,13 @@
15 15 */
16 16 package org.thingsboard.edge.rpc;
17 17
18   -import org.thingsboard.server.gen.edge.CloudDownlinkDataProto;
19   -import org.thingsboard.server.gen.edge.EdgeConfigurationProto;
  18 +import org.thingsboard.server.gen.edge.AssetUpdateMsg;
  19 +import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
  20 +import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  21 +import org.thingsboard.server.gen.edge.DownlinkMsg;
  22 +import org.thingsboard.server.gen.edge.EdgeConfiguration;
  23 +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
  24 +import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
20 25 import org.thingsboard.server.gen.edge.UplinkMsg;
21 26 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
22 27
... ... @@ -27,11 +32,15 @@ public interface EdgeRpcClient {
27 32 void connect(String integrationKey,
28 33 String integrationSecret,
29 34 Consumer<UplinkResponseMsg> onUplinkResponse,
30   - Consumer<EdgeConfigurationProto> onEdgeUpdate,
31   - Consumer<CloudDownlinkDataProto> onDownlink,
  35 + Consumer<EdgeConfiguration> onEdgeUpdate,
  36 + Consumer<DeviceUpdateMsg> onDeviceUpdate,
  37 + Consumer<AssetUpdateMsg> onAssetUpdate,
  38 + Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
  39 + Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
  40 + Consumer<DashboardUpdateMsg> onDashboardUpdate,
  41 + Consumer<DownlinkMsg> onDownlink,
32 42 Consumer<Exception> onError);
33 43
34   -
35 44 void disconnect() throws InterruptedException;
36 45
37 46 void sendUplinkMsg(UplinkMsg uplinkMsg) throws InterruptedException;
... ...
... ... @@ -34,17 +34,21 @@ service EdgeRpcService {
34 34 message RequestMsg {
35 35 RequestMsgType msgType = 1;
36 36 ConnectRequestMsg connectRequestMsg = 2;
37   - UplinkMsg uplinkMsg = 3;
  37 + DeviceUpdateMsg deviceUpdateMsg = 3;
  38 + UplinkMsg uplinkMsg = 4;
38 39 }
39 40
40 41 message ResponseMsg {
41   - ResponseMsgType msgType = 1;
42   - ConnectResponseMsg connectResponseMsg = 2;
43   - UplinkResponseMsg uplinkResponseMsg = 3;
44   - DownlinkMsg downlinkMsg = 4;
  42 + ConnectResponseMsg connectResponseMsg = 1;
  43 + UplinkResponseMsg uplinkResponseMsg = 2;
  44 + DeviceUpdateMsg deviceUpdateMsg = 3;
  45 + RuleChainUpdateMsg ruleChainUpdateMsg = 4;
  46 + DashboardUpdateMsg dashboardUpdateMsg = 5;
  47 + AssetUpdateMsg assetUpdateMsg = 6;
  48 + EntityViewUpdateMsg entityViewUpdateMsg = 7;
  49 + DownlinkMsg downlinkMsg = 8;
45 50 }
46 51
47   -
48 52 enum RequestMsgType {
49 53 CONNECT_RPC_MESSAGE = 0;
50 54 UPLINK_RPC_MESSAGE = 1;
... ... @@ -64,10 +68,10 @@ enum ConnectResponseCode {
64 68 message ConnectResponseMsg {
65 69 ConnectResponseCode responseCode = 1;
66 70 string errorMsg = 2;
67   - EdgeConfigurationProto configuration = 3;
  71 + EdgeConfiguration configuration = 3;
68 72 }
69 73
70   -message EdgeConfigurationProto {
  74 +message EdgeConfiguration {
71 75 int64 tenantIdMSB = 1;
72 76 int64 tenantIdLSB = 2;
73 77 string name = 5;
... ... @@ -75,23 +79,98 @@ message EdgeConfigurationProto {
75 79 string type = 7;
76 80 }
77 81
78   -enum ResponseMsgType {
79   - SAVE_ENTITY_MESSAGE = 0;
80   - DELETE_ENTITY_MESSAGE = 1;
  82 +enum UpdateMsgType {
  83 + ENTITY_CREATED_RPC_MESSAGE = 0;
  84 + ENTITY_UPDATED_RPC_MESSAGE = 1;
  85 + ENTITY_DELETED_RPC_MESSAGE = 2;
81 86 }
82 87
83   -message CloudDownlinkDataProto {
  88 +message DeviceData {
84 89 string deviceName = 1;
85 90 string deviceType = 2;
86 91 bytes tbMsg = 3;
87 92 }
88 93
  94 +message AssetData {
  95 + string assetName = 1;
  96 + string assetType = 2;
  97 + bytes tbMsg = 3;
  98 +}
  99 +
  100 +message EntityViewData {
  101 + string entityViewName = 1;
  102 + string entityViewType = 2;
  103 + bytes tbMsg = 3;
  104 +}
  105 +
  106 +message RuleChainData {
  107 + string ruleChainName = 1;
  108 + string ruleChainType = 2;
  109 + bytes tbMsg = 3;
  110 +}
  111 +
  112 +message DashboardData {
  113 + string dashboardName = 1;
  114 + string dashboardType = 2;
  115 + bytes tbMsg = 3;
  116 +}
  117 +
  118 +message RuleChainUpdateMsg {
  119 + UpdateMsgType msgType = 1;
  120 + int64 idMSB = 2;
  121 + int64 idLSB = 3;
  122 + string name = 4;
  123 + int64 firstRuleNodeIdMSB = 5;
  124 + int64 firstRuleNodeIdLSB = 6;
  125 + bool root = 7;
  126 + bool debugMode = 8;
  127 + string configuration = 9;
  128 +}
  129 +
  130 +message DashboardUpdateMsg {
  131 + UpdateMsgType msgType = 1;
  132 + int64 idMSB = 2;
  133 + int64 idLSB = 3;
  134 + string name = 4;
  135 +}
  136 +
  137 +message DeviceUpdateMsg {
  138 + UpdateMsgType msgType = 1;
  139 + string name = 2;
  140 + string type = 3;
  141 +}
  142 +
  143 +message AssetUpdateMsg {
  144 + UpdateMsgType msgType = 1;
  145 + string name = 2;
  146 + string type = 3;
  147 +}
  148 +
  149 +message EntityViewUpdateMsg {
  150 + UpdateMsgType msgType = 1;
  151 + string name = 2;
  152 + string type = 3;
  153 + string relatedName = 4;
  154 + string relatedType = 5;
  155 + EntityType relatedEntityType = 6;
  156 +}
  157 +
  158 +enum EntityType {
  159 + DEVICE = 0;
  160 + ASSET = 1;
  161 +}
  162 +
89 163 /**
90 164 * Main Messages;
91 165 */
92 166
93 167 message UplinkMsg {
94 168 int32 uplinkMsgId = 1;
  169 + repeated DeviceData deviceData = 2;
  170 + repeated AssetData assetData = 3;
  171 + repeated EntityViewData entityViewData = 4;
  172 + repeated RuleChainData ruleChainData = 5;
  173 + repeated DashboardData dashboardData = 6;
95 174 }
96 175
97 176 message UplinkResponseMsg {
... ... @@ -100,5 +179,11 @@ message UplinkResponseMsg {
100 179 }
101 180
102 181 message DownlinkMsg {
103   - CloudDownlinkDataProto cloudData = 1;
  182 + int32 downlinkMsgId = 1;
  183 + repeated DeviceData deviceData = 2;
  184 + repeated AssetData assetData = 3;
  185 + repeated EntityViewData entityViewData = 4;
  186 + repeated RuleChainData ruleChainData = 5;
  187 + repeated DashboardData dashboardData = 6;
104 188 }
  189 +
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ... @@ -15,9 +15,12 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
18 20 import com.google.common.base.Function;
19 21 import com.google.common.util.concurrent.Futures;
20 22 import com.google.common.util.concurrent.ListenableFuture;
  23 +import lombok.Data;
21 24 import lombok.extern.slf4j.Slf4j;
22 25 import org.springframework.beans.factory.annotation.Autowired;
23 26 import org.springframework.cache.Cache;
... ... @@ -27,10 +30,14 @@ import org.springframework.cache.annotation.Cacheable;
27 30 import org.springframework.stereotype.Service;
28 31 import org.springframework.util.StringUtils;
29 32 import org.thingsboard.server.common.data.Customer;
  33 +import org.thingsboard.server.common.data.DataConstants;
30 34 import org.thingsboard.server.common.data.EntitySubtype;
31 35 import org.thingsboard.server.common.data.EntityType;
  36 +import org.thingsboard.server.common.data.Event;
  37 +import org.thingsboard.server.common.data.ShortEdgeInfo;
32 38 import org.thingsboard.server.common.data.Tenant;
33 39 import org.thingsboard.server.common.data.edge.Edge;
  40 +import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
34 41 import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
35 42 import org.thingsboard.server.common.data.id.CustomerId;
36 43 import org.thingsboard.server.common.data.id.EdgeId;
... ... @@ -38,23 +45,31 @@ import org.thingsboard.server.common.data.id.EntityId;
38 45 import org.thingsboard.server.common.data.id.TenantId;
39 46 import org.thingsboard.server.common.data.page.TextPageData;
40 47 import org.thingsboard.server.common.data.page.TextPageLink;
  48 +import org.thingsboard.server.common.data.page.TimePageData;
  49 +import org.thingsboard.server.common.data.page.TimePageLink;
41 50 import org.thingsboard.server.common.data.relation.EntityRelation;
42 51 import org.thingsboard.server.common.data.relation.EntitySearchDirection;
  52 +import org.thingsboard.server.common.data.rule.RuleChain;
  53 +import org.thingsboard.server.common.msg.TbMsg;
43 54 import org.thingsboard.server.dao.customer.CustomerDao;
44 55 import org.thingsboard.server.dao.dashboard.DashboardService;
45 56 import org.thingsboard.server.dao.entity.AbstractEntityService;
  57 +import org.thingsboard.server.dao.event.EventService;
46 58 import org.thingsboard.server.dao.exception.DataValidationException;
  59 +import org.thingsboard.server.dao.rule.RuleChainService;
47 60 import org.thingsboard.server.dao.service.DataValidator;
48 61 import org.thingsboard.server.dao.service.PaginatedRemover;
49 62 import org.thingsboard.server.dao.service.Validator;
50 63 import org.thingsboard.server.dao.tenant.TenantDao;
51 64
52 65 import javax.annotation.Nullable;
  66 +import java.io.IOException;
53 67 import java.util.ArrayList;
54 68 import java.util.Collections;
55 69 import java.util.Comparator;
56 70 import java.util.List;
57 71 import java.util.Optional;
  72 +import java.util.UUID;
58 73 import java.util.stream.Collectors;
59 74
60 75 import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE;
... ... @@ -69,6 +84,8 @@ import static org.thingsboard.server.dao.service.Validator.validateString;
69 84 @Slf4j
70 85 public class BaseEdgeService extends AbstractEntityService implements EdgeService {
71 86
  87 + private static final ObjectMapper mapper = new ObjectMapper();
  88 +
72 89 public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
73 90 public static final String INCORRECT_PAGE_LINK = "Incorrect page link ";
74 91 public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId ";
... ... @@ -87,8 +104,14 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
87 104 private CacheManager cacheManager;
88 105
89 106 @Autowired
  107 + private EventService eventService;
  108 +
  109 + @Autowired
90 110 private DashboardService dashboardService;
91 111
  112 + @Autowired
  113 + private RuleChainService ruleChainService;
  114 +
92 115 @Override
93 116 public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) {
94 117 log.trace("Executing findEdgeById [{}]", edgeId);
... ... @@ -150,7 +173,8 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
150 173
151 174 Edge edge = edgeDao.findById(tenantId, edgeId.getId());
152 175
153   - deleteEntityRelations(tenantId, edgeId);
  176 + dashboardService.unassignEdgeDashboards(tenantId, edgeId);
  177 + ruleChainService.unassignEdgeRuleChains(tenantId, edgeId);
154 178
155 179 List<Object> list = new ArrayList<>();
156 180 list.add(edge.getTenantId());
... ... @@ -158,7 +182,7 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
158 182 Cache cache = cacheManager.getCache(EDGE_CACHE);
159 183 cache.evict(list);
160 184
161   - dashboardService.unassignEdgeDashboards(tenantId, edgeId);
  185 + deleteEntityRelations(tenantId, edgeId);
162 186
163 187 edgeDao.removeById(tenantId, edgeId.getId());
164 188 }
... ... @@ -275,6 +299,106 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
275 299 });
276 300 }
277 301
  302 + @Override
  303 + public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg) {
  304 + try {
  305 + switch (tbMsg.getOriginator().getEntityType()) {
  306 + case ASSET:
  307 + processAsset(tenantId, tbMsg);
  308 + break;
  309 + case DEVICE:
  310 + processDevice(tenantId, tbMsg);
  311 + break;
  312 + case DASHBOARD:
  313 + processDashboard(tenantId, tbMsg);
  314 + break;
  315 + case RULE_CHAIN:
  316 + processRuleChain(tenantId, tbMsg);
  317 + break;
  318 + case ENTITY_VIEW:
  319 + processEntityView(tenantId, tbMsg);
  320 + break;
  321 + default:
  322 + log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
  323 + }
  324 + } catch (IOException e) {
  325 + log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
  326 + }
  327 +
  328 +
  329 + }
  330 +
  331 + private void processDevice(TenantId tenantId, TbMsg tbMsg) {
  332 + // TODO
  333 + }
  334 +
  335 + private void processDashboard(TenantId tenantId, TbMsg tbMsg) {
  336 + processAssignedEntity(tenantId, tbMsg, EntityType.DASHBOARD);
  337 + }
  338 +
  339 + private void processEntityView(TenantId tenantId, TbMsg tbMsg) {
  340 + // TODO
  341 + }
  342 +
  343 + private void processAsset(TenantId tenantId, TbMsg tbMsg) {
  344 + // TODO
  345 + }
  346 +
  347 + private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EntityType entityType) {
  348 + EdgeId edgeId;
  349 + switch (tbMsg.getType()) {
  350 + case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
  351 + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
  352 + pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData());
  353 + break;
  354 + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
  355 + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
  356 + pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData());
  357 + break;
  358 + }
  359 + }
  360 +
  361 + private void processRuleChain(TenantId tenantId, TbMsg tbMsg) throws IOException {
  362 + switch (tbMsg.getType()) {
  363 + case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
  364 + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
  365 + processAssignedEntity(tenantId, tbMsg, EntityType.RULE_CHAIN);
  366 + break;
  367 + case DataConstants.ENTITY_DELETED:
  368 + case DataConstants.ENTITY_CREATED:
  369 + case DataConstants.ENTITY_UPDATED:
  370 + RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
  371 + for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) {
  372 + pushEventToEdge(tenantId, assignedEdge.getEdgeId(), tbMsg.getType(), EntityType.RULE_CHAIN, tbMsg.getData());
  373 + }
  374 + break;
  375 + default:
  376 + log.warn("Unsupported message type " + tbMsg.getType());
  377 + }
  378 +
  379 + }
  380 +
  381 + private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, String type, EntityType entityType, String data) {
  382 + log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type [{}], data [{}]", tenantId, edgeId, type, data);
  383 +
  384 + EdgeQueueEntry queueEntry = new EdgeQueueEntry();
  385 + queueEntry.setType(type);
  386 + queueEntry.setEntityType(entityType);
  387 + queueEntry.setData(data);
  388 +
  389 + Event event = new Event();
  390 + event.setEntityId(edgeId);
  391 + event.setTenantId(tenantId);
  392 + event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE);
  393 + event.setBody(mapper.valueToTree(queueEntry));
  394 + eventService.saveAsync(event);
  395 + }
  396 +
  397 + @Override
  398 + public TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) {
  399 + return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink);
  400 + }
  401 +
278 402 private DataValidator<Edge> edgeValidator =
279 403 new DataValidator<Edge>() {
280 404
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.dao.rule;
17 17
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
18 19 import com.google.common.base.Function;
19 20 import com.google.common.util.concurrent.Futures;
20 21 import com.google.common.util.concurrent.ListenableFuture;
... ... @@ -43,6 +44,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
43 44 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
44 45 import org.thingsboard.server.common.data.rule.RuleNode;
45 46 import org.thingsboard.server.dao.edge.EdgeDao;
  47 +import org.thingsboard.server.dao.edge.EdgeService;
46 48 import org.thingsboard.server.dao.entity.AbstractEntityService;
47 49 import org.thingsboard.server.dao.exception.DataValidationException;
48 50 import org.thingsboard.server.dao.service.DataValidator;
... ... @@ -66,6 +68,8 @@ import java.util.stream.Collectors;
66 68 @Slf4j
67 69 public class BaseRuleChainService extends AbstractEntityService implements RuleChainService {
68 70
  71 + private static final ObjectMapper objectMapper = new ObjectMapper();
  72 +
69 73 @Autowired
70 74 private RuleChainDao ruleChainDao;
71 75
... ... @@ -78,6 +82,9 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
78 82 @Autowired
79 83 private EdgeDao edgeDao;
80 84
  85 + @Autowired
  86 + private EdgeService edgeService;
  87 +
81 88 @Override
82 89 public RuleChain saveRuleChain(RuleChain ruleChain) {
83 90 ruleChainValidator.validate(ruleChain, RuleChain::getTenantId);
... ... @@ -381,10 +388,9 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
381 388 log.warn("[{}] Failed to create ruleChain relation. Edge Id: [{}]", ruleChainId, edgeId);
382 389 throw new RuntimeException(e);
383 390 }
384   - return saveRuleChain(ruleChain);
385   - } else {
386   - return ruleChain;
  391 + ruleChain = saveRuleChain(ruleChain);
387 392 }
  393 + return ruleChain;
388 394 }
389 395
390 396 @Override
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...