Commit c99cf51ca644702bd15f8afc7ec47f763cdf32fb

Authored by Volodymyr Babak
1 parent efd565d8

Added push to edge/cloud functionality

Showing 17 changed files with 637 additions and 148 deletions
... ... @@ -59,6 +59,7 @@ import org.thingsboard.server.dao.cassandra.CassandraCluster;
59 59 import org.thingsboard.server.dao.customer.CustomerService;
60 60 import org.thingsboard.server.dao.dashboard.DashboardService;
61 61 import org.thingsboard.server.dao.device.DeviceService;
  62 +import org.thingsboard.server.dao.edge.EdgeService;
62 63 import org.thingsboard.server.dao.entityview.EntityViewService;
63 64 import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
64 65 import org.thingsboard.server.dao.nosql.CassandraStatementTask;
... ... @@ -305,6 +306,11 @@ class DefaultTbContext implements TbContext {
305 306 }
306 307
307 308 @Override
  309 + public EdgeService getEdgeService() {
  310 + return mainCtx.getEdgeService();
  311 + }
  312 +
  313 + @Override
308 314 public EventLoopGroup getSharedEventLoop() {
309 315 return mainCtx.getSharedEventLoopGroupService().getSharedEventLoopGroup();
310 316 }
... ...
... ... @@ -23,6 +23,7 @@ import com.datastax.driver.core.utils.UUIDs;
23 23
24 24 import java.util.Optional;
25 25
  26 +import com.google.common.util.concurrent.FutureCallback;
26 27 import lombok.extern.slf4j.Slf4j;
27 28 import org.thingsboard.server.actors.ActorSystemContext;
28 29 import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg;
... ... @@ -49,6 +50,7 @@ import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
49 50 import org.thingsboard.server.dao.edge.EdgeService;
50 51 import org.thingsboard.server.dao.rule.RuleChainService;
51 52
  53 +import javax.annotation.Nullable;
52 54 import java.util.ArrayList;
53 55 import java.util.Collections;
54 56 import java.util.HashMap;
... ... @@ -348,7 +350,17 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
348 350 case DataConstants.ENTITY_DELETED:
349 351 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
350 352 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
351   - edgeService.pushEventToEdge(tenantId, msg);
  353 + edgeService.pushEventToEdge(tenantId, msg, new FutureCallback<Void>() {
  354 + @Override
  355 + public void onSuccess(@Nullable Void aVoid) {
  356 + log.debug("Event saved successfully!");
  357 + }
  358 +
  359 + @Override
  360 + public void onFailure(Throwable t) {
  361 + log.debug("Failure during event save", t);
  362 + }
  363 + });
352 364 }
353 365
354 366 }
... ...
... ... @@ -17,13 +17,50 @@ package org.thingsboard.server.service.edge;
17 17
18 18 import lombok.Data;
19 19 import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.context.annotation.Lazy;
20 21 import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.actors.service.ActorService;
  23 +import org.thingsboard.server.dao.asset.AssetService;
  24 +import org.thingsboard.server.dao.attributes.AttributesService;
  25 +import org.thingsboard.server.dao.customer.CustomerService;
  26 +import org.thingsboard.server.dao.device.DeviceService;
21 27 import org.thingsboard.server.dao.edge.EdgeService;
  28 +import org.thingsboard.server.dao.entityview.EntityViewService;
  29 +import org.thingsboard.server.dao.relation.RelationService;
22 30
23 31 @Component
24 32 @Data
25 33 public class EdgeContextComponent {
26 34
  35 + @Lazy
27 36 @Autowired
28 37 private EdgeService edgeService;
  38 +
  39 + @Lazy
  40 + @Autowired
  41 + private AssetService assetService;
  42 +
  43 + @Lazy
  44 + @Autowired
  45 + private DeviceService deviceService;
  46 +
  47 + @Lazy
  48 + @Autowired
  49 + private EntityViewService entityViewService;
  50 +
  51 + @Lazy
  52 + @Autowired
  53 + private AttributesService attributesService;
  54 +
  55 + @Lazy
  56 + @Autowired
  57 + private CustomerService customerService;
  58 +
  59 + @Lazy
  60 + @Autowired
  61 + private RelationService relationService;
  62 +
  63 + @Lazy
  64 + @Autowired
  65 + private ActorService actorService;
29 66 }
... ...
... ... @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired;
25 25 import org.springframework.beans.factory.annotation.Value;
26 26 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
27 27 import org.springframework.stereotype.Service;
  28 +import org.thingsboard.server.actors.service.ActorService;
28 29 import org.thingsboard.server.common.data.Event;
29 30 import org.thingsboard.server.common.data.edge.Edge;
30 31 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -36,6 +37,7 @@ import org.thingsboard.server.dao.asset.AssetService;
36 37 import org.thingsboard.server.dao.attributes.AttributesService;
37 38 import org.thingsboard.server.dao.device.DeviceService;
38 39 import org.thingsboard.server.dao.edge.EdgeService;
  40 +import org.thingsboard.server.dao.entityview.EntityViewService;
39 41 import org.thingsboard.server.dao.event.EventService;
40 42 import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
41 43 import org.thingsboard.server.gen.edge.RequestMsg;
... ... @@ -82,8 +84,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
82 84 private DeviceService deviceService;
83 85
84 86 @Autowired
  87 + private EntityViewService entityViewService;
  88 +
  89 + @Autowired
85 90 private AttributesService attributesService;
86 91
  92 + @Autowired
  93 + private ActorService actorService;
  94 +
87 95 private Server server;
88 96
89 97 private ExecutorService executor;
... ... @@ -124,7 +132,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
124 132
125 133 @Override
126 134 public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
127   - return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, edgeService, assetService, deviceService, attributesService, objectMapper).getInputStream();
  135 + return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, objectMapper).getInputStream();
128 136 }
129 137
130 138 private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
... ...
... ... @@ -18,8 +18,10 @@ package org.thingsboard.server.service.edge.rpc;
18 18 import com.datastax.driver.core.utils.UUIDs;
19 19 import com.fasterxml.jackson.core.JsonProcessingException;
20 20 import com.fasterxml.jackson.databind.ObjectMapper;
  21 +import com.fasterxml.jackson.databind.node.ObjectNode;
21 22 import com.google.common.util.concurrent.Futures;
22 23 import com.google.common.util.concurrent.ListenableFuture;
  24 +import com.google.protobuf.ByteString;
23 25 import io.grpc.stub.StreamObserver;
24 26 import lombok.Data;
25 27 import lombok.extern.slf4j.Slf4j;
... ... @@ -33,23 +35,29 @@ import org.thingsboard.server.common.data.asset.Asset;
33 35 import org.thingsboard.server.common.data.edge.Edge;
34 36 import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
35 37 import org.thingsboard.server.common.data.id.AssetId;
  38 +import org.thingsboard.server.common.data.id.CustomerId;
36 39 import org.thingsboard.server.common.data.id.DeviceId;
37 40 import org.thingsboard.server.common.data.id.EdgeId;
  41 +import org.thingsboard.server.common.data.id.EntityId;
  42 +import org.thingsboard.server.common.data.id.EntityViewId;
38 43 import org.thingsboard.server.common.data.id.TenantId;
39 44 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
40 45 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
41 46 import org.thingsboard.server.common.data.kv.LongDataEntry;
42 47 import org.thingsboard.server.common.data.page.TimePageData;
43 48 import org.thingsboard.server.common.data.page.TimePageLink;
  49 +import org.thingsboard.server.common.data.relation.EntityRelation;
  50 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
44 51 import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
45 52 import org.thingsboard.server.common.data.rule.RuleChain;
46 53 import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
47 54 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
48 55 import org.thingsboard.server.common.data.rule.RuleNode;
49   -import org.thingsboard.server.dao.asset.AssetService;
50   -import org.thingsboard.server.dao.attributes.AttributesService;
51   -import org.thingsboard.server.dao.device.DeviceService;
52   -import org.thingsboard.server.dao.edge.EdgeService;
  56 +import org.thingsboard.server.common.msg.TbMsg;
  57 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  58 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
  59 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  60 +import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
53 61 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
54 62 import org.thingsboard.server.gen.edge.AssetUpdateMsg;
55 63 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
... ... @@ -57,7 +65,9 @@ import org.thingsboard.server.gen.edge.ConnectResponseCode;
57 65 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
58 66 import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
59 67 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  68 +import org.thingsboard.server.gen.edge.DownlinkMsg;
60 69 import org.thingsboard.server.gen.edge.EdgeConfiguration;
  70 +import org.thingsboard.server.gen.edge.EntityDataProto;
61 71 import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
62 72 import org.thingsboard.server.gen.edge.NodeConnectionInfoProto;
63 73 import org.thingsboard.server.gen.edge.RequestMsg;
... ... @@ -72,12 +82,14 @@ import org.thingsboard.server.gen.edge.UplinkMsg;
72 82 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
73 83 import org.thingsboard.server.service.edge.EdgeContextComponent;
74 84
  85 +import java.io.IOException;
75 86 import java.util.ArrayList;
76 87 import java.util.Collections;
77 88 import java.util.List;
78 89 import java.util.Optional;
79 90 import java.util.UUID;
80 91 import java.util.concurrent.ExecutionException;
  92 +import java.util.concurrent.locks.ReentrantLock;
81 93 import java.util.function.BiConsumer;
82 94 import java.util.function.Consumer;
83 95
... ... @@ -85,6 +97,8 @@ import java.util.function.Consumer;
85 97 @Data
86 98 public final class EdgeGrpcSession implements Cloneable {
87 99
  100 + private static final ReentrantLock entityCreationLock = new ReentrantLock();
  101 +
88 102 private final UUID sessionId;
89 103 private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
90 104 private final Consumer<EdgeId> sessionCloseListener;
... ... @@ -96,24 +110,14 @@ public final class EdgeGrpcSession implements Cloneable {
96 110 private StreamObserver<ResponseMsg> outputStream;
97 111 private boolean connected;
98 112
99   - private EdgeService edgeService;
100   - private AssetService assetService;
101   - private DeviceService deviceService;
102   - private AttributesService attributesService;
103   -
104   - EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
105   - BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, Consumer<EdgeId> sessionCloseListener,
106   - EdgeService edgeService, AssetService assetService, DeviceService deviceService, AttributesService attributesService, ObjectMapper objectMapper) {
  113 + EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
  114 + Consumer<EdgeId> sessionCloseListener, ObjectMapper objectMapper) {
107 115 this.sessionId = UUID.randomUUID();
108 116 this.ctx = ctx;
109 117 this.outputStream = outputStream;
110 118 this.sessionOpenListener = sessionOpenListener;
111 119 this.sessionCloseListener = sessionCloseListener;
112 120 this.objectMapper = objectMapper;
113   - this.edgeService = edgeService;
114   - this.assetService = assetService;
115   - this.deviceService = deviceService;
116   - this.attributesService = attributesService;
117 121 initInputStream();
118 122 }
119 123
... ... @@ -136,6 +140,11 @@ public final class EdgeGrpcSession implements Cloneable {
136 140 .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg()))
137 141 .build());
138 142 }
  143 + if (requestMsg.getMsgType().equals(RequestMsgType.DEVICE_UPDATE_RPC_MESSAGE) && requestMsg.hasDeviceUpdateMsg()) {
  144 + outputStream.onNext(ResponseMsg.newBuilder()
  145 + .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg()))
  146 + .build());
  147 + }
139 148 }
140 149 }
141 150
... ... @@ -159,37 +168,23 @@ public final class EdgeGrpcSession implements Cloneable {
159 168 TimePageData<Event> pageData;
160 169 UUID ifOffset = null;
161 170 do {
162   - pageData = edgeService.findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
  171 + pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
163 172 if (!pageData.getData().isEmpty()) {
  173 + edge = ctx.getEdgeService().findEdgeById(edge.getTenantId(), edge.getId());
164 174 for (Event event : pageData.getData()) {
165 175 EdgeQueueEntry entry;
166 176 try {
167 177 entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
  178 +
168 179 UpdateMsgType msgType = getResponseMsgType(entry.getType());
169   - switch (entry.getEntityType()) {
170   - case DEVICE:
171   - Device device = objectMapper.readValue(entry.getData(), Device.class);
172   - onDeviceUpdated(msgType, device);
  180 + switch (msgType) {
  181 + case ENTITY_DELETED_RPC_MESSAGE:
  182 + case ENTITY_UPDATED_RPC_MESSAGE:
  183 + case ENTITY_CREATED_RPC_MESSAGE:
  184 + processEntityCRUDMessage(entry, msgType);
173 185 break;
174   - case ASSET:
175   - Asset asset = objectMapper.readValue(entry.getData(), Asset.class);
176   - onAssetUpdated(msgType, asset);
177   - break;
178   - case ENTITY_VIEW:
179   - EntityView entityView = objectMapper.readValue(entry.getData(), EntityView.class);
180   - onEntityViewUpdated(msgType, entityView);
181   - break;
182   - case DASHBOARD:
183   - Dashboard dashboard = objectMapper.readValue(entry.getData(), Dashboard.class);
184   - onDashboardUpdated(msgType, dashboard);
185   - break;
186   - case RULE_CHAIN:
187   - RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class);
188   - onRuleChainUpdated(msgType, ruleChain);
189   - break;
190   - case RULE_CHAIN_METADATA:
191   - RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class);
192   - onRuleChainMetadataUpdated(msgType, ruleChainMetaData);
  186 + case RULE_CHAIN_CUSTOM_MESSAGE:
  187 + processCustomDownlinkMessage(entry);
193 188 break;
194 189 }
195 190 } catch (Exception e) {
... ... @@ -214,14 +209,71 @@ public final class EdgeGrpcSession implements Cloneable {
214 209 }
215 210 }
216 211
  212 + private void processCustomDownlinkMessage(EdgeQueueEntry entry) throws IOException {
  213 + log.trace("Executing processCustomDownlinkMessage, entry [{}], msgType [{}]", entry);
  214 + TbMsg tbMsg = objectMapper.readValue(entry.getData(), TbMsg.class);
  215 + String entityName = null;
  216 + switch (entry.getEntityType()) {
  217 + case DEVICE:
  218 + Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(tbMsg.getOriginator().getId()));
  219 + entityName = device.getName();
  220 + break;
  221 + case ASSET:
  222 + Asset asset = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(tbMsg.getOriginator().getId()));
  223 + entityName = asset.getName();
  224 + break;
  225 + case ENTITY_VIEW:
  226 + EntityView entityView = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(tbMsg.getOriginator().getId()));
  227 + entityName = entityView.getName();
  228 + break;
  229 +
  230 + }
  231 + if (entityName != null) {
  232 + log.debug("Sending donwlink entity data msg, entityName [{}], tbMsg [{}]", entityName, tbMsg);
  233 + outputStream.onNext(ResponseMsg.newBuilder()
  234 + .setDownlinkMsg(constructDownlinkEntityDataMsg(entityName, tbMsg))
  235 + .build());
  236 + }
  237 + }
  238 +
  239 + private void processEntityCRUDMessage(EdgeQueueEntry entry, UpdateMsgType msgType) throws java.io.IOException {
  240 + log.trace("Executing processEntityCRUDMessage, entry [{}], msgType [{}]", entry, msgType);
  241 + switch (entry.getEntityType()) {
  242 + case DEVICE:
  243 + Device device = objectMapper.readValue(entry.getData(), Device.class);
  244 + onDeviceUpdated(msgType, device);
  245 + break;
  246 + case ASSET:
  247 + Asset asset = objectMapper.readValue(entry.getData(), Asset.class);
  248 + onAssetUpdated(msgType, asset);
  249 + break;
  250 + case ENTITY_VIEW:
  251 + EntityView entityView = objectMapper.readValue(entry.getData(), EntityView.class);
  252 + onEntityViewUpdated(msgType, entityView);
  253 + break;
  254 + case DASHBOARD:
  255 + Dashboard dashboard = objectMapper.readValue(entry.getData(), Dashboard.class);
  256 + onDashboardUpdated(msgType, dashboard);
  257 + break;
  258 + case RULE_CHAIN:
  259 + RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class);
  260 + onRuleChainUpdated(msgType, ruleChain);
  261 + break;
  262 + case RULE_CHAIN_METADATA:
  263 + RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class);
  264 + onRuleChainMetadataUpdated(msgType, ruleChainMetaData);
  265 + break;
  266 + }
  267 + }
  268 +
217 269 private void updateQueueStartTs(Long newStartTs) {
218 270 List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry("queueStartTs", newStartTs), System.currentTimeMillis()));
219   - attributesService.save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
  271 + ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
220 272 }
221 273
222 274 private ListenableFuture<Long> getQueueStartTs() {
223 275 ListenableFuture<Optional<AttributeKvEntry>> future =
224   - attributesService.find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
  276 + ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
225 277 return Futures.transform(future, attributeKvEntryOpt -> {
226 278 if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
227 279 AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
... ... @@ -272,17 +324,24 @@ public final class EdgeGrpcSession implements Cloneable {
272 324 }
273 325
274 326 private UpdateMsgType getResponseMsgType(String msgType) {
275   - switch (msgType) {
276   - case DataConstants.ENTITY_UPDATED:
277   - return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
278   - case DataConstants.ENTITY_CREATED:
279   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
280   - return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
281   - case DataConstants.ENTITY_DELETED:
282   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
283   - return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
284   - default:
285   - throw new RuntimeException("Unsupported mstType [" + msgType + "]");
  327 + if (msgType.equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
  328 + msgType.equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
  329 + msgType.equals(DataConstants.ATTRIBUTES_UPDATED) ||
  330 + msgType.equals(DataConstants.ATTRIBUTES_DELETED)) {
  331 + return UpdateMsgType.RULE_CHAIN_CUSTOM_MESSAGE;
  332 + } else {
  333 + switch (msgType) {
  334 + case DataConstants.ENTITY_UPDATED:
  335 + return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
  336 + case DataConstants.ENTITY_CREATED:
  337 + case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
  338 + return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
  339 + case DataConstants.ENTITY_DELETED:
  340 + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
  341 + return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
  342 + default:
  343 + throw new RuntimeException("Unsupported msgType [" + msgType + "]");
  344 + }
286 345 }
287 346 }
288 347
... ... @@ -292,7 +351,7 @@ public final class EdgeGrpcSession implements Cloneable {
292 351 .setIdMSB(ruleChain.getId().getId().getMostSignificantBits())
293 352 .setIdLSB(ruleChain.getId().getId().getLeastSignificantBits())
294 353 .setName(ruleChain.getName())
295   - .setRoot(ruleChain.isRoot())
  354 + .setRoot(ruleChain.getId().equals(edge.getRootRuleChainId()))
296 355 .setDebugMode(ruleChain.isDebugMode())
297 356 .setConfiguration(JacksonUtil.toString(ruleChain.getConfiguration()));
298 357 if (ruleChain.getFirstRuleNodeId() != null) {
... ... @@ -302,6 +361,17 @@ public final class EdgeGrpcSession implements Cloneable {
302 361 return builder.build();
303 362 }
304 363
  364 + private DownlinkMsg constructDownlinkEntityDataMsg(String entityName, TbMsg tbMsg) {
  365 + EntityDataProto entityData = EntityDataProto.newBuilder()
  366 + .setEntityName(entityName)
  367 + .setTbMsg(ByteString.copyFrom(TbMsg.toBytes(tbMsg))).build();
  368 +
  369 + DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
  370 + .addAllEntityData(Collections.singletonList(entityData));
  371 +
  372 + return builder.build();
  373 + }
  374 +
305 375 private RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
306 376 try {
307 377 RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder()
... ... @@ -409,12 +479,12 @@ public final class EdgeGrpcSession implements Cloneable {
409 479 String relatedType;
410 480 org.thingsboard.server.gen.edge.EntityType relatedEntityType;
411 481 if (entityView.getEntityId().getEntityType().equals(EntityType.DEVICE)) {
412   - Device device = deviceService.findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId()));
  482 + Device device = ctx.getDeviceService().findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId()));
413 483 relatedName = device.getName();
414 484 relatedType = device.getType();
415 485 relatedEntityType = org.thingsboard.server.gen.edge.EntityType.DEVICE;
416 486 } else {
417   - Asset asset = assetService.findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId()));
  487 + Asset asset = ctx.getAssetService().findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId()));
418 488 relatedName = asset.getName();
419 489 relatedType = asset.getType();
420 490 relatedEntityType = org.thingsboard.server.gen.edge.EntityType.ASSET;
... ... @@ -430,7 +500,103 @@ public final class EdgeGrpcSession implements Cloneable {
430 500 }
431 501
432 502 private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) {
433   - return null;
  503 + try {
  504 + if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) {
  505 + for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
  506 + TbMsg tbMsg = null;
  507 + TbMsg tmp = TbMsg.fromBytes(entityData.getTbMsg().toByteArray());
  508 + switch (tmp.getOriginator().getEntityType()) {
  509 + case DEVICE:
  510 + String deviceName = entityData.getEntityName();
  511 + String deviceType = entityData.getEntityType();
  512 + Device device = getOrCreateDevice(deviceName, deviceType);
  513 + if (device != null) {
  514 + tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), device.getId(), tmp.getMetaData().copy(),
  515 + tmp.getDataType(), tmp.getData(), null, null, 0L);
  516 + }
  517 + break;
  518 + case ASSET:
  519 + String assetName = entityData.getEntityName();
  520 + Asset asset = ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), assetName);
  521 + if (asset != null) {
  522 + tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), asset.getId(), tmp.getMetaData().copy(),
  523 + tmp.getDataType(), tmp.getData(), null, null, 0L);
  524 + }
  525 + break;
  526 + case ENTITY_VIEW:
  527 + String entityViewName = entityData.getEntityName();
  528 + EntityView entityView = ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityViewName);
  529 + if (entityView != null) {
  530 + tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), entityView.getId(), tmp.getMetaData().copy(),
  531 + tmp.getDataType(), tmp.getData(), null, null, 0L);
  532 + }
  533 + break;
  534 + }
  535 + if (tbMsg != null) {
  536 + ctx.getActorService().onMsg(new SendToClusterMsg(tbMsg.getOriginator(), new ServiceToRuleEngineMsg(edge.getTenantId(), tbMsg)));
  537 + }
  538 + }
  539 + }
  540 + } catch (Exception e) {
  541 + return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build();
  542 + }
  543 +
  544 + return UplinkResponseMsg.newBuilder().setSuccess(true).build();
  545 + }
  546 +
  547 + private Device getOrCreateDevice(String deviceName, String deviceType) {
  548 + Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
  549 + if (device == null) {
  550 + entityCreationLock.lock();
  551 + try {
  552 + return processGetOrCreateDevice(deviceName, deviceType);
  553 + } finally {
  554 + entityCreationLock.unlock();
  555 + }
  556 + }
  557 + return device;
  558 + }
  559 +
  560 + private Device processGetOrCreateDevice(String deviceName, String deviceType) {
  561 + Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
  562 + if (device == null) {
  563 + device = new Device();
  564 + device.setName(deviceName);
  565 + device.setType(deviceType);
  566 + device.setTenantId(edge.getTenantId());
  567 + device.setCustomerId(edge.getCustomerId());
  568 + device = ctx.getDeviceService().saveDevice(device);
  569 + createRelationFromEdge(device.getId());
  570 + ctx.getActorService().onDeviceAdded(device);
  571 + pushDeviceCreatedEventToRuleEngine(device);
  572 + }
  573 + return device;
  574 + }
  575 +
  576 + private void pushDeviceCreatedEventToRuleEngine(Device device) {
  577 + try {
  578 + ObjectNode entityNode = objectMapper.valueToTree(device);
  579 + TbMsg msg = new TbMsg(UUIDs.timeBased(), DataConstants.ENTITY_CREATED, device.getId(), deviceActionTbMsgMetaData(device), objectMapper.writeValueAsString(entityNode), null, null, 0L);
  580 + ctx.getActorService().onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(edge.getTenantId(), msg)));
  581 + } catch (JsonProcessingException | IllegalArgumentException e) {
  582 + log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e);
  583 + }
  584 + }
  585 +
  586 + private TbMsgMetaData deviceActionTbMsgMetaData(Device device) {
  587 + TbMsgMetaData metaData = getTbMsgMetaData();
  588 + CustomerId customerId = device.getCustomerId();
  589 + if (customerId != null && !customerId.isNullUid()) {
  590 + metaData.putValue("customerId", customerId.toString());
  591 + }
  592 + return metaData;
  593 + }
  594 +
  595 + private TbMsgMetaData getTbMsgMetaData() {
  596 + TbMsgMetaData metaData = new TbMsgMetaData();
  597 + metaData.putValue("edgeId", edge.getId().toString());
  598 + metaData.putValue("edgeName", edge.getName());
  599 + return metaData;
434 600 }
435 601
436 602 private ConnectResponseMsg processConnect(ConnectRequestMsg request) {
... ... @@ -464,6 +630,15 @@ public final class EdgeGrpcSession implements Cloneable {
464 630 .setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
465 631 }
466 632
  633 + private void createRelationFromEdge(EntityId entityId) {
  634 + EntityRelation relation = new EntityRelation();
  635 + relation.setFrom(edge.getId());
  636 + relation.setTo(entityId);
  637 + relation.setTypeGroup(RelationTypeGroup.COMMON);
  638 + relation.setType(EntityRelation.EDGE_TYPE);
  639 + ctx.getRelationService().saveRelation(edge.getTenantId(), relation);
  640 + }
  641 +
467 642 private EdgeConfiguration constructEdgeConfigProto(Edge edge) throws JsonProcessingException {
468 643 return EdgeConfiguration.newBuilder()
469 644 .setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
  18 +import com.fasterxml.jackson.core.JsonProcessingException;
  19 +import com.google.common.util.concurrent.FutureCallback;
18 20 import com.google.common.util.concurrent.ListenableFuture;
19 21 import org.thingsboard.server.common.data.EntitySubtype;
20 22 import org.thingsboard.server.common.data.Event;
... ... @@ -31,6 +33,7 @@ import org.thingsboard.server.common.data.page.TimePageData;
31 33 import org.thingsboard.server.common.data.page.TimePageLink;
32 34 import org.thingsboard.server.common.msg.TbMsg;
33 35
  36 +import java.io.IOException;
34 37 import java.util.List;
35 38 import java.util.Optional;
36 39
... ... @@ -72,11 +75,11 @@ public interface EdgeService {
72 75
73 76 ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId);
74 77
75   - void pushEventToEdge(TenantId tenantId, TbMsg tbMsg);
  78 + void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback);
76 79
77 80 TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink);
78 81
79   - Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId);
  82 + Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException;
80 83 }
81 84
82 85
... ...
... ... @@ -32,6 +32,7 @@ public class EntityRelation implements Serializable {
32 32
33 33 private static final long serialVersionUID = 2807343040519543363L;
34 34
  35 + public static final String EDGE_TYPE = "ManagedByEdge";
35 36 public static final String CONTAINS_TYPE = "Contains";
36 37 public static final String MANAGES_TYPE = "Manages";
37 38
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.common.data.rule;
2 17
3 18 public enum RuleChainType {
... ...
... ... @@ -53,6 +53,7 @@ message ResponseMsg {
53 53 enum RequestMsgType {
54 54 CONNECT_RPC_MESSAGE = 0;
55 55 UPLINK_RPC_MESSAGE = 1;
  56 + DEVICE_UPDATE_RPC_MESSAGE = 2;
56 57 }
57 58
58 59 message ConnectRequestMsg {
... ... @@ -84,35 +85,12 @@ enum UpdateMsgType {
84 85 ENTITY_CREATED_RPC_MESSAGE = 0;
85 86 ENTITY_UPDATED_RPC_MESSAGE = 1;
86 87 ENTITY_DELETED_RPC_MESSAGE = 2;
  88 + RULE_CHAIN_CUSTOM_MESSAGE = 3;
87 89 }
88 90
89   -message DeviceData {
90   - string deviceName = 1;
91   - string deviceType = 2;
92   - bytes tbMsg = 3;
93   -}
94   -
95   -message AssetData {
96   - string assetName = 1;
97   - string assetType = 2;
98   - bytes tbMsg = 3;
99   -}
100   -
101   -message EntityViewData {
102   - string entityViewName = 1;
103   - string entityViewType = 2;
104   - bytes tbMsg = 3;
105   -}
106   -
107   -message RuleChainData {
108   - string ruleChainName = 1;
109   - string ruleChainType = 2;
110   - bytes tbMsg = 3;
111   -}
112   -
113   -message DashboardData {
114   - string dashboardName = 1;
115   - string dashboardType = 2;
  91 +message EntityDataProto {
  92 + string entityName = 1;
  93 + string entityType = 2;
116 94 bytes tbMsg = 3;
117 95 }
118 96
... ... @@ -199,11 +177,7 @@ enum EntityType {
199 177
200 178 message UplinkMsg {
201 179 int32 uplinkMsgId = 1;
202   - repeated DeviceData deviceData = 2;
203   - repeated AssetData assetData = 3;
204   - repeated EntityViewData entityViewData = 4;
205   - repeated RuleChainData ruleChainData = 5;
206   - repeated DashboardData dashboardData = 6;
  180 + repeated EntityDataProto entityData = 2;
207 181 }
208 182
209 183 message UplinkResponseMsg {
... ... @@ -213,10 +187,6 @@ message UplinkResponseMsg {
213 187
214 188 message DownlinkMsg {
215 189 int32 downlinkMsgId = 1;
216   - repeated DeviceData deviceData = 2;
217   - repeated AssetData assetData = 3;
218   - repeated EntityViewData entityViewData = 4;
219   - repeated RuleChainData ruleChainData = 5;
220   - repeated DashboardData dashboardData = 6;
  190 + repeated EntityDataProto entityData = 2;
221 191 }
222 192
... ...
... ... @@ -105,6 +105,10 @@ public final class TbMsg implements Serializable {
105 105 return ByteBuffer.wrap(bytes);
106 106 }
107 107
  108 + public static TbMsg fromBytes(byte[] data) {
  109 + return fromBytes(ByteBuffer.wrap(data));
  110 + }
  111 +
108 112 public static TbMsg fromBytes(ByteBuffer buffer) {
109 113 try {
110 114 MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(buffer.array());
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.edge;
17 17
18 18 import com.fasterxml.jackson.databind.ObjectMapper;
19 19 import com.google.common.base.Function;
  20 +import com.google.common.util.concurrent.FutureCallback;
20 21 import com.google.common.util.concurrent.Futures;
21 22 import com.google.common.util.concurrent.ListenableFuture;
22 23 import lombok.extern.slf4j.Slf4j;
... ... @@ -41,9 +42,12 @@ import org.thingsboard.server.common.data.edge.Edge;
41 42 import org.thingsboard.server.common.data.edge.EdgeQueueEntityType;
42 43 import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
43 44 import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
  45 +import org.thingsboard.server.common.data.id.AssetId;
44 46 import org.thingsboard.server.common.data.id.CustomerId;
  47 +import org.thingsboard.server.common.data.id.DeviceId;
45 48 import org.thingsboard.server.common.data.id.EdgeId;
46 49 import org.thingsboard.server.common.data.id.EntityId;
  50 +import org.thingsboard.server.common.data.id.EntityViewId;
47 51 import org.thingsboard.server.common.data.id.RuleChainId;
48 52 import org.thingsboard.server.common.data.id.TenantId;
49 53 import org.thingsboard.server.common.data.page.TextPageData;
... ... @@ -55,9 +59,13 @@ import org.thingsboard.server.common.data.relation.EntitySearchDirection;
55 59 import org.thingsboard.server.common.data.rule.RuleChain;
56 60 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
57 61 import org.thingsboard.server.common.msg.TbMsg;
  62 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  63 +import org.thingsboard.server.dao.asset.AssetService;
58 64 import org.thingsboard.server.dao.customer.CustomerDao;
59 65 import org.thingsboard.server.dao.dashboard.DashboardService;
  66 +import org.thingsboard.server.dao.device.DeviceService;
60 67 import org.thingsboard.server.dao.entity.AbstractEntityService;
  68 +import org.thingsboard.server.dao.entityview.EntityViewService;
61 69 import org.thingsboard.server.dao.event.EventService;
62 70 import org.thingsboard.server.dao.exception.DataValidationException;
63 71 import org.thingsboard.server.dao.rule.RuleChainService;
... ... @@ -67,6 +75,8 @@ import org.thingsboard.server.dao.service.Validator;
67 75 import org.thingsboard.server.dao.tenant.TenantDao;
68 76
69 77 import javax.annotation.Nullable;
  78 +import javax.annotation.PostConstruct;
  79 +import javax.annotation.PreDestroy;
70 80 import java.io.IOException;
71 81 import java.util.ArrayList;
72 82 import java.util.Collections;
... ... @@ -74,6 +84,8 @@ import java.util.Comparator;
74 84 import java.util.List;
75 85 import java.util.Optional;
76 86 import java.util.UUID;
  87 +import java.util.concurrent.ExecutorService;
  88 +import java.util.concurrent.Executors;
77 89 import java.util.stream.Collectors;
78 90
79 91 import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE;
... ... @@ -116,6 +128,30 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
116 128 @Autowired
117 129 private RuleChainService ruleChainService;
118 130
  131 + @Autowired
  132 + private DeviceService deviceService;
  133 +
  134 + @Autowired
  135 + private AssetService assetService;
  136 +
  137 + @Autowired
  138 + private EntityViewService entityViewService;
  139 +
  140 + private ExecutorService tsCallBackExecutor;
  141 +
  142 + @PostConstruct
  143 + public void initExecutor() {
  144 + tsCallBackExecutor = Executors.newSingleThreadExecutor();
  145 + }
  146 +
  147 + @PreDestroy
  148 + public void shutdownExecutor() {
  149 + if (tsCallBackExecutor != null) {
  150 + tsCallBackExecutor.shutdownNow();
  151 + }
  152 + }
  153 +
  154 +
119 155 @Override
120 156 public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) {
121 157 log.trace("Executing findEdgeById [{}]", edgeId);
... ... @@ -304,46 +340,80 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
304 340 }
305 341
306 342 @Override
307   - public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg) {
308   - try {
309   - switch (tbMsg.getOriginator().getEntityType()) {
310   - case ASSET:
311   - processAsset(tenantId, tbMsg);
312   - break;
313   - case DEVICE:
314   - processDevice(tenantId, tbMsg);
315   - break;
316   - case DASHBOARD:
317   - processDashboard(tenantId, tbMsg);
318   - break;
319   - case RULE_CHAIN:
320   - processRuleChain(tenantId, tbMsg);
321   - break;
322   - case ENTITY_VIEW:
323   - processEntityView(tenantId, tbMsg);
324   - break;
325   - default:
326   - log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
  343 + public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
  344 + if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
  345 + tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
  346 + tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) ||
  347 + tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
  348 + processCustomTbMsg(tenantId, tbMsg, callback);
  349 + } else {
  350 + try {
  351 + switch (tbMsg.getOriginator().getEntityType()) {
  352 + case ASSET:
  353 + processAsset(tenantId, tbMsg, callback);
  354 + break;
  355 + case DEVICE:
  356 + processDevice(tenantId, tbMsg, callback);
  357 + break;
  358 + case DASHBOARD:
  359 + processDashboard(tenantId, tbMsg, callback);
  360 + break;
  361 + case RULE_CHAIN:
  362 + processRuleChain(tenantId, tbMsg, callback);
  363 + break;
  364 + case ENTITY_VIEW:
  365 + processEntityView(tenantId, tbMsg, callback);
  366 + break;
  367 + default:
  368 + log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
  369 + }
  370 + } catch (IOException e) {
  371 + log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
327 372 }
328   - } catch (IOException e) {
329   - log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
330 373 }
  374 + }
331 375
332   -
  376 + private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
  377 + EdgeId edgeId = null;
  378 + EdgeQueueEntityType edgeQueueEntityType = null;
  379 + switch (tbMsg.getOriginator().getEntityType()) {
  380 + case DEVICE:
  381 + edgeQueueEntityType = EdgeQueueEntityType.DEVICE;
  382 + Device device = deviceService.findDeviceById(tenantId, new DeviceId(tbMsg.getOriginator().getId()));
  383 + edgeId = device.getEdgeId();
  384 + break;
  385 + case ASSET:
  386 + edgeQueueEntityType = EdgeQueueEntityType.ASSET;
  387 + Asset asset = assetService.findAssetById(tenantId, new AssetId(tbMsg.getOriginator().getId()));
  388 + edgeId = asset.getEdgeId();
  389 + break;
  390 + case ENTITY_VIEW:
  391 + edgeQueueEntityType = EdgeQueueEntityType.ENTITY_VIEW;
  392 + EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(tbMsg.getOriginator().getId()));
  393 + edgeId = entityView.getEdgeId();
  394 + break;
  395 + }
  396 + if (edgeId != null) {
  397 + try {
  398 + saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback);
  399 + } catch (IOException e) {
  400 + log.error("Error while saving custom tbMsg into Edge Queue", e);
  401 + }
  402 + }
333 403 }
334 404
335   - private void processDevice(TenantId tenantId, TbMsg tbMsg) throws IOException {
  405 + private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
336 406 switch (tbMsg.getType()) {
337 407 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
338 408 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
339   - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE);
  409 + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback);
340 410 break;
341 411 case DataConstants.ENTITY_DELETED:
342 412 case DataConstants.ENTITY_CREATED:
343 413 case DataConstants.ENTITY_UPDATED:
344 414 Device device = mapper.readValue(tbMsg.getData(), Device.class);
345 415 if (device.getEdgeId() != null) {
346   - pushEventsToEdge(tenantId, device.getEdgeId(), EdgeQueueEntityType.DEVICE, tbMsg);
  416 + pushEventToEdge(tenantId, device.getEdgeId(), EdgeQueueEntityType.DEVICE, tbMsg, callback);
347 417 }
348 418 break;
349 419 default:
... ... @@ -351,18 +421,18 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
351 421 }
352 422 }
353 423
354   - private void processAsset(TenantId tenantId, TbMsg tbMsg) throws IOException {
  424 + private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
355 425 switch (tbMsg.getType()) {
356 426 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
357 427 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
358   - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET);
  428 + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback);
359 429 break;
360 430 case DataConstants.ENTITY_DELETED:
361 431 case DataConstants.ENTITY_CREATED:
362 432 case DataConstants.ENTITY_UPDATED:
363 433 Asset asset = mapper.readValue(tbMsg.getData(), Asset.class);
364 434 if (asset.getEdgeId() != null) {
365   - pushEventsToEdge(tenantId, asset.getEdgeId(), EdgeQueueEntityType.ASSET, tbMsg);
  435 + pushEventToEdge(tenantId, asset.getEdgeId(), EdgeQueueEntityType.ASSET, tbMsg, callback);
366 436 }
367 437 break;
368 438 default:
... ... @@ -370,18 +440,18 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
370 440 }
371 441 }
372 442
373   - private void processEntityView(TenantId tenantId, TbMsg tbMsg) throws IOException {
  443 + private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
374 444 switch (tbMsg.getType()) {
375 445 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
376 446 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
377   - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW);
  447 + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback);
378 448 break;
379 449 case DataConstants.ENTITY_DELETED:
380 450 case DataConstants.ENTITY_CREATED:
381 451 case DataConstants.ENTITY_UPDATED:
382 452 EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class);
383 453 if (entityView.getEdgeId() != null) {
384   - pushEventsToEdge(tenantId, entityView.getEdgeId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg);
  454 + pushEventToEdge(tenantId, entityView.getEdgeId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback);
385 455 }
386 456 break;
387 457 default:
... ... @@ -389,15 +459,15 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
389 459 }
390 460 }
391 461
392   - private void processDashboard(TenantId tenantId, TbMsg tbMsg) throws IOException {
393   - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD);
  462 + private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
  463 + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback);
394 464 }
395 465
396   - private void processRuleChain(TenantId tenantId, TbMsg tbMsg) throws IOException {
  466 + private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
397 467 switch (tbMsg.getType()) {
398 468 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
399 469 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
400   - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN);
  470 + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback);
401 471 break;
402 472 case DataConstants.ENTITY_DELETED:
403 473 case DataConstants.ENTITY_CREATED:
... ... @@ -405,7 +475,7 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
405 475 RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
406 476 if (ruleChain.getAssignedEdges() != null && !ruleChain.getAssignedEdges().isEmpty()) {
407 477 for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) {
408   - pushEventsToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg);
  478 + pushEventToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback);
409 479 }
410 480 }
411 481 break;
... ... @@ -414,31 +484,31 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
414 484 }
415 485 }
416 486
417   - private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType) throws IOException {
  487 + private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback<Void> callback) throws IOException {
418 488 EdgeId edgeId;
419 489 switch (tbMsg.getType()) {
420 490 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
421 491 edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
422   - pushEventsToEdge(tenantId, edgeId, entityType, tbMsg);
  492 + pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
423 493 break;
424 494 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
425 495 edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
426   - pushEventsToEdge(tenantId, edgeId, entityType, tbMsg);
  496 + pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
427 497 break;
428 498 }
429 499 }
430 500
431   - private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg) throws IOException {
  501 + private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
432 502 log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg);
433 503
434   - pushEventsToEdge(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData());
  504 + saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback);
435 505
436 506 if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) {
437   - pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg);
  507 + pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback);
438 508 }
439 509 }
440 510
441   - private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data) throws IOException {
  511 + private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback<Void> callback) throws IOException {
442 512 log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data);
443 513
444 514 EdgeQueueEntry queueEntry = new EdgeQueueEntry();
... ... @@ -451,17 +521,33 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
451 521 event.setTenantId(tenantId);
452 522 event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE);
453 523 event.setBody(mapper.valueToTree(queueEntry));
454   - eventService.saveAsync(event);
  524 + ListenableFuture<Event> saveFuture = eventService.saveAsync(event);
  525 +
  526 + addMainCallback(saveFuture, callback);
  527 + }
  528 +
  529 + private void addMainCallback(ListenableFuture<Event> saveFuture, final FutureCallback<Void> callback) {
  530 + Futures.addCallback(saveFuture, new FutureCallback<Event>() {
  531 + @Override
  532 + public void onSuccess(@Nullable Event result) {
  533 + callback.onSuccess(null);
  534 + }
  535 +
  536 + @Override
  537 + public void onFailure(Throwable t) {
  538 + callback.onFailure(t);
  539 + }
  540 + }, tsCallBackExecutor);
455 541 }
456 542
457   - private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg) throws IOException {
  543 + private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
458 544 RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
459 545 switch (tbMsg.getType()) {
460 546 case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
461 547 case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
462 548 case DataConstants.ENTITY_UPDATED:
463 549 RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId());
464   - pushEventsToEdge(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData));
  550 + saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback);
465 551 break;
466 552 default:
467 553 log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
... ... @@ -474,10 +560,22 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
474 560 }
475 561
476 562 @Override
477   - public Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) {
  563 + public Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
478 564 edge.setRootRuleChainId(ruleChainId);
479 565 Edge saveEdge = saveEdge(edge);
480 566 ruleChainService.updateEdgeRuleChains(tenantId, saveEdge.getId());
  567 + RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
  568 + saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() {
  569 + @Override
  570 + public void onSuccess(@Nullable Void aVoid) {
  571 + log.debug("Event saved successfully!");
  572 + }
  573 +
  574 + @Override
  575 + public void onFailure(Throwable t) {
  576 + log.debug("Failure during event save", t);
  577 + }
  578 + });
481 579 return saveEdge;
482 580 }
483 581
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -5,7 +5,7 @@
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7 7 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  8 + * http://www.apache.org/licenses/LICENSE-2.0
9 9 *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.dao.cassandra.CassandraCluster;
33 33 import org.thingsboard.server.dao.customer.CustomerService;
34 34 import org.thingsboard.server.dao.dashboard.DashboardService;
35 35 import org.thingsboard.server.dao.device.DeviceService;
  36 +import org.thingsboard.server.dao.edge.EdgeService;
36 37 import org.thingsboard.server.dao.entityview.EntityViewService;
37 38 import org.thingsboard.server.dao.nosql.CassandraStatementTask;
38 39 import org.thingsboard.server.dao.relation.RelationService;
... ... @@ -110,6 +111,8 @@ public interface TbContext {
110 111
111 112 EntityViewService getEntityViewService();
112 113
  114 + EdgeService getEdgeService();
  115 +
113 116 ListeningExecutor getJsExecutor();
114 117
115 118 ListeningExecutor getMailExecutor();
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.edge;
  17 +
  18 +import com.google.common.util.concurrent.FutureCallback;
  19 +import lombok.Data;
  20 +import org.thingsboard.rule.engine.api.TbContext;
  21 +import org.thingsboard.server.common.msg.TbMsg;
  22 +
  23 +import javax.annotation.Nullable;
  24 +
  25 +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
  26 +
  27 +@Data
  28 +class PushToEdgeNodeCallback implements FutureCallback<Void> {
  29 + private final TbContext ctx;
  30 + private final TbMsg msg;
  31 +
  32 + @Override
  33 + public void onSuccess(@Nullable Void result) {
  34 + ctx.tellNext(msg, SUCCESS);
  35 + }
  36 +
  37 + @Override
  38 + public void onFailure(Throwable t) {
  39 + ctx.tellFailure(msg, t);
  40 + }
  41 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.edge;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
  20 +import org.thingsboard.rule.engine.api.RuleNode;
  21 +import org.thingsboard.rule.engine.api.TbContext;
  22 +import org.thingsboard.rule.engine.api.TbNode;
  23 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  24 +import org.thingsboard.rule.engine.api.TbNodeException;
  25 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  26 +import org.thingsboard.server.common.data.plugin.ComponentType;
  27 +import org.thingsboard.server.common.msg.TbMsg;
  28 +
  29 +@Slf4j
  30 +@RuleNode(
  31 + type = ComponentType.ACTION,
  32 + name = "push to cloud",
  33 + configClazz = EmptyNodeConfiguration.class,
  34 + nodeDescription = "Pushes messages to cloud",
  35 + nodeDetails = "Pushes messages to cloud. This node is used only on Edge instances to push messages from Edge to Cloud.",
  36 + uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
  37 + configDirective = "tbNodeEmptyConfig",
  38 + icon = "cloud_upload"
  39 +)
  40 +public class TbMsgPushToCloudNode implements TbNode {
  41 +
  42 + private EmptyNodeConfiguration config;
  43 +
  44 + @Override
  45 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  46 + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
  47 + }
  48 +
  49 + @Override
  50 + public void onMsg(TbContext ctx, TbMsg msg) {
  51 + // Implementation of this node is done on the Edge
  52 + }
  53 +
  54 + @Override
  55 + public void destroy() {
  56 + }
  57 +
  58 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.edge;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
  20 +import org.thingsboard.rule.engine.api.RuleNode;
  21 +import org.thingsboard.rule.engine.api.TbContext;
  22 +import org.thingsboard.rule.engine.api.TbNode;
  23 +import org.thingsboard.rule.engine.api.TbNodeConfiguration;
  24 +import org.thingsboard.rule.engine.api.TbNodeException;
  25 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  26 +import org.thingsboard.server.common.data.plugin.ComponentType;
  27 +import org.thingsboard.server.common.msg.TbMsg;
  28 +
  29 +@Slf4j
  30 +@RuleNode(
  31 + type = ComponentType.ACTION,
  32 + name = "push to edge",
  33 + configClazz = EmptyNodeConfiguration.class,
  34 + nodeDescription = "Pushes messages to edge",
  35 + nodeDetails = "Pushes messages to edge, if Message Originator assigned to particular edge or is EDGE entity. This node is used only on Cloud instances to push messages from Cloud to Edge. Supports only DEVICE, ENTITY_VIEW, ASSET and EDGE Message Originator(s).",
  36 + uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
  37 + configDirective = "tbNodeEmptyConfig",
  38 + icon = "cloud_download"
  39 +)
  40 +public class TbMsgPushToEdgeNode implements TbNode {
  41 +
  42 + private EmptyNodeConfiguration config;
  43 +
  44 + @Override
  45 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  46 + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class);
  47 + }
  48 +
  49 + @Override
  50 + public void onMsg(TbContext ctx, TbMsg msg) {
  51 + ctx.getEdgeService().pushEventToEdge(ctx.getTenantId(), msg, new PushToEdgeNodeCallback(ctx, msg));
  52 + }
  53 +
  54 + @Override
  55 + public void destroy() {
  56 + }
  57 +
  58 +}
... ...