Commit 58cdc63abe433fae1757b3d97bf984745e911202

Authored by Volodymyr Babak
1 parent 69ba71be

Push edge entities to edge on edge connect

... ... @@ -29,9 +29,13 @@ import org.thingsboard.server.dao.device.DeviceService;
29 29 import org.thingsboard.server.dao.edge.EdgeService;
30 30 import org.thingsboard.server.dao.entityview.EntityViewService;
31 31 import org.thingsboard.server.dao.relation.RelationService;
32   -import org.thingsboard.server.service.edge.rpc.alarm.AlarmMetadataConstructor;
  32 +import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
  33 +import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
  34 +import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
  35 +import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
  36 +import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
33 37 import org.thingsboard.server.service.edge.rpc.init.InitEdgeService;
34   -import org.thingsboard.server.service.edge.rpc.ruleChain.RuleChainMetadataConstructor;
  38 +import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
35 39
36 40 @Component
37 41 @Data
... ... @@ -83,9 +87,25 @@ public class EdgeContextComponent {
83 87
84 88 @Lazy
85 89 @Autowired
86   - private RuleChainMetadataConstructor ruleChainMetadataConstructor;
  90 + private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor;
87 91
88 92 @Lazy
89 93 @Autowired
90   - private AlarmMetadataConstructor alarmMetadataConstructor;
  94 + private AlarmUpdateMsgConstructor alarmUpdateMsgConstructor;
  95 +
  96 + @Lazy
  97 + @Autowired
  98 + private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor;
  99 +
  100 + @Lazy
  101 + @Autowired
  102 + private AssetUpdateMsgConstructor assetUpdateMsgConstructor;
  103 +
  104 + @Lazy
  105 + @Autowired
  106 + private EntityViewUpdateMsgConstructor entityViewUpdateMsgConstructor;
  107 +
  108 + @Lazy
  109 + @Autowired
  110 + private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
91 111 }
... ...
... ... @@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.Customer;
29 29 import org.thingsboard.server.common.data.Dashboard;
30 30 import org.thingsboard.server.common.data.DataConstants;
31 31 import org.thingsboard.server.common.data.Device;
32   -import org.thingsboard.server.common.data.EntityType;
33 32 import org.thingsboard.server.common.data.EntityView;
34 33 import org.thingsboard.server.common.data.Event;
35 34 import org.thingsboard.server.common.data.User;
... ... @@ -37,7 +36,6 @@ import org.thingsboard.server.common.data.alarm.Alarm;
37 36 import org.thingsboard.server.common.data.alarm.AlarmSeverity;
38 37 import org.thingsboard.server.common.data.alarm.AlarmStatus;
39 38 import org.thingsboard.server.common.data.asset.Asset;
40   -import org.thingsboard.server.common.data.audit.ActionType;
41 39 import org.thingsboard.server.common.data.edge.Edge;
42 40 import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
43 41 import org.thingsboard.server.common.data.id.AssetId;
... ... @@ -45,7 +43,6 @@ import org.thingsboard.server.common.data.id.CustomerId;
45 43 import org.thingsboard.server.common.data.id.DeviceId;
46 44 import org.thingsboard.server.common.data.id.EdgeId;
47 45 import org.thingsboard.server.common.data.id.EntityId;
48   -import org.thingsboard.server.common.data.id.EntityIdFactory;
49 46 import org.thingsboard.server.common.data.id.EntityViewId;
50 47 import org.thingsboard.server.common.data.id.TenantId;
51 48 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
... ... @@ -56,48 +53,36 @@ import org.thingsboard.server.common.data.page.TimePageData;
56 53 import org.thingsboard.server.common.data.page.TimePageLink;
57 54 import org.thingsboard.server.common.data.relation.EntityRelation;
58 55 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
59   -import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
60 56 import org.thingsboard.server.common.data.rule.RuleChain;
61   -import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
62 57 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
63   -import org.thingsboard.server.common.data.rule.RuleNode;
64 58 import org.thingsboard.server.common.msg.TbMsg;
65 59 import org.thingsboard.server.common.msg.TbMsgDataType;
66 60 import org.thingsboard.server.common.msg.TbMsgMetaData;
67 61 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
68 62 import org.thingsboard.server.common.msg.session.SessionMsgType;
69 63 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
70   -import org.thingsboard.server.dao.util.mapping.JacksonUtil;
71 64 import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
72   -import org.thingsboard.server.gen.edge.AssetUpdateMsg;
73 65 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
74 66 import org.thingsboard.server.gen.edge.ConnectResponseCode;
75 67 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
76 68 import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
77   -import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
78 69 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
79 70 import org.thingsboard.server.gen.edge.DownlinkMsg;
80 71 import org.thingsboard.server.gen.edge.EdgeConfiguration;
81 72 import org.thingsboard.server.gen.edge.EntityDataProto;
82 73 import org.thingsboard.server.gen.edge.EntityUpdateMsg;
83   -import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
84   -import org.thingsboard.server.gen.edge.NodeConnectionInfoProto;
85 74 import org.thingsboard.server.gen.edge.RequestMsg;
86 75 import org.thingsboard.server.gen.edge.RequestMsgType;
87 76 import org.thingsboard.server.gen.edge.ResponseMsg;
88   -import org.thingsboard.server.gen.edge.RuleChainConnectionInfoProto;
  77 +import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
89 78 import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
90   -import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
91   -import org.thingsboard.server.gen.edge.RuleNodeProto;
92 79 import org.thingsboard.server.gen.edge.UpdateMsgType;
93 80 import org.thingsboard.server.gen.edge.UplinkMsg;
94 81 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
95 82 import org.thingsboard.server.gen.edge.UserUpdateMsg;
96 83 import org.thingsboard.server.service.edge.EdgeContextComponent;
97 84
98   -import javax.swing.text.html.parser.Entity;
99 85 import java.io.IOException;
100   -import java.util.ArrayList;
101 86 import java.util.Collections;
102 87 import java.util.List;
103 88 import java.util.Optional;
... ... @@ -108,7 +93,6 @@ import java.util.function.BiConsumer;
108 93 import java.util.function.Consumer;
109 94
110 95 import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
111   -import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
112 96
113 97 @Slf4j
114 98 @Data
... ... @@ -176,7 +160,6 @@ public final class EdgeGrpcSession implements Cloneable {
176 160 };
177 161 }
178 162
179   -
180 163 void processHandleMessages() throws ExecutionException, InterruptedException {
181 164 Long queueStartTs = getQueueStartTs().get();
182 165 // TODO: this 100 value must be changed properly
... ... @@ -184,7 +167,7 @@ public final class EdgeGrpcSession implements Cloneable {
184 167 TimePageData<Event> pageData;
185 168 UUID ifOffset = null;
186 169 do {
187   - pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
  170 + pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
188 171 if (!pageData.getData().isEmpty()) {
189 172 log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
190 173 for (Event event : pageData.getData()) {
... ... @@ -363,13 +346,13 @@ public final class EdgeGrpcSession implements Cloneable {
363 346 ListenableFuture<Optional<AttributeKvEntry>> future =
364 347 ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
365 348 return Futures.transform(future, attributeKvEntryOpt -> {
366   - if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
367   - AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
368   - return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
369   - } else {
370   - return 0L;
371   - }
372   - } );
  349 + if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
  350 + AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
  351 + return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
  352 + } else {
  353 + return 0L;
  354 + }
  355 + });
373 356 }
374 357
375 358 private void onEdgeUpdated(UpdateMsgType msgType, Edge edge) {
... ... @@ -379,7 +362,7 @@ public final class EdgeGrpcSession implements Cloneable {
379 362
380 363 private void onDeviceUpdated(UpdateMsgType msgType, Device device) {
381 364 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
382   - .setDeviceUpdateMsg(constructDeviceUpdatedMsg(msgType, device))
  365 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device))
383 366 .build();
384 367 outputStream.onNext(ResponseMsg.newBuilder()
385 368 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -388,7 +371,7 @@ public final class EdgeGrpcSession implements Cloneable {
388 371
389 372 private void onAssetUpdated(UpdateMsgType msgType, Asset asset) {
390 373 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
391   - .setAssetUpdateMsg(constructAssetUpdatedMsg(msgType, asset))
  374 + .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset))
392 375 .build();
393 376 outputStream.onNext(ResponseMsg.newBuilder()
394 377 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -397,7 +380,7 @@ public final class EdgeGrpcSession implements Cloneable {
397 380
398 381 private void onEntityViewUpdated(UpdateMsgType msgType, EntityView entityView) {
399 382 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
400   - .setEntityViewUpdateMsg(constructEntityViewUpdatedMsg(msgType, entityView))
  383 + .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView))
401 384 .build();
402 385 outputStream.onNext(ResponseMsg.newBuilder()
403 386 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -406,7 +389,7 @@ public final class EdgeGrpcSession implements Cloneable {
406 389
407 390 private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) {
408 391 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
409   - .setRuleChainUpdateMsg(ctx.getRuleChainMetadataConstructor().constructRuleChainUpdatedMsg(edge, msgType, ruleChain))
  392 + .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge, msgType, ruleChain))
410 393 .build();
411 394 outputStream.onNext(ResponseMsg.newBuilder()
412 395 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -415,7 +398,7 @@ public final class EdgeGrpcSession implements Cloneable {
415 398
416 399 private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
417 400 RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
418   - ctx.getRuleChainMetadataConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
  401 + ctx.getRuleChainUpdateMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
419 402 if (ruleChainMetadataUpdateMsg != null) {
420 403 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
421 404 .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
... ... @@ -428,7 +411,7 @@ public final class EdgeGrpcSession implements Cloneable {
428 411
429 412 private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) {
430 413 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
431   - .setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard))
  414 + .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard))
432 415 .build();
433 416 outputStream.onNext(ResponseMsg.newBuilder()
434 417 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -437,7 +420,7 @@ public final class EdgeGrpcSession implements Cloneable {
437 420
438 421 private void onAlarmUpdated(UpdateMsgType msgType, Alarm alarm) {
439 422 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
440   - .setAlarmUpdateMsg(ctx.getAlarmMetadataConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
  423 + .setAlarmUpdateMsg(ctx.getAlarmUpdateMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
441 424 .build();
442 425 outputStream.onNext(ResponseMsg.newBuilder()
443 426 .setEntityUpdateMsg(entityUpdateMsg)
... ... @@ -481,17 +464,6 @@ public final class EdgeGrpcSession implements Cloneable {
481 464 return builder.build();
482 465 }
483 466
484   - private DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) {
485   - dashboard = ctx.getDashboardService().findDashboardById(edge.getTenantId(), dashboard.getId());
486   - DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder()
487   - .setMsgType(msgType)
488   - .setIdMSB(dashboard.getId().getId().getMostSignificantBits())
489   - .setIdLSB(dashboard.getId().getId().getLeastSignificantBits())
490   - .setTitle(dashboard.getTitle())
491   - .setConfiguration(JacksonUtil.toString(dashboard.getConfiguration()));
492   - return builder.build();
493   - }
494   -
495 467 private CustomerUpdateMsg constructCustomerUpdatedMsg(UpdateMsgType msgType, Customer customer) {
496 468 CustomerUpdateMsg.Builder builder = CustomerUpdateMsg.newBuilder()
497 469 .setMsgType(msgType);
... ... @@ -504,47 +476,6 @@ public final class EdgeGrpcSession implements Cloneable {
504 476 return builder.build();
505 477 }
506 478
507   - private DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
508   - DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
509   - .setMsgType(msgType)
510   - .setName(device.getName())
511   - .setType(device.getType());
512   - return builder.build();
513   - }
514   -
515   - private AssetUpdateMsg constructAssetUpdatedMsg(UpdateMsgType msgType, Asset asset) {
516   - AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder()
517   - .setMsgType(msgType)
518   - .setName(asset.getName())
519   - .setType(asset.getType());
520   - return builder.build();
521   - }
522   -
523   - private EntityViewUpdateMsg constructEntityViewUpdatedMsg(UpdateMsgType msgType, EntityView entityView) {
524   - String relatedName;
525   - String relatedType;
526   - org.thingsboard.server.gen.edge.EntityType relatedEntityType;
527   - if (entityView.getEntityId().getEntityType().equals(EntityType.DEVICE)) {
528   - Device device = ctx.getDeviceService().findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId()));
529   - relatedName = device.getName();
530   - relatedType = device.getType();
531   - relatedEntityType = org.thingsboard.server.gen.edge.EntityType.DEVICE;
532   - } else {
533   - Asset asset = ctx.getAssetService().findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId()));
534   - relatedName = asset.getName();
535   - relatedType = asset.getType();
536   - relatedEntityType = org.thingsboard.server.gen.edge.EntityType.ASSET;
537   - }
538   - EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder()
539   - .setMsgType(msgType)
540   - .setName(entityView.getName())
541   - .setType(entityView.getType())
542   - .setRelatedName(relatedName)
543   - .setRelatedType(relatedType)
544   - .setRelatedEntityType(relatedEntityType);
545   - return builder.build();
546   - }
547   -
548 479 private UplinkResponseMsg processUplinkMsg(UplinkMsg uplinkMsg) {
549 480 try {
550 481 if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) {
... ... @@ -594,11 +525,16 @@ public final class EdgeGrpcSession implements Cloneable {
594 525 }
595 526 }
596 527 }
597   - if (uplinkMsg.getAlarmUpdatemsgList() != null && !uplinkMsg.getAlarmUpdatemsgList().isEmpty()) {
598   - for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdatemsgList()) {
  528 + if (uplinkMsg.getAlarmUpdateMsgList() != null && !uplinkMsg.getAlarmUpdateMsgList().isEmpty()) {
  529 + for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
599 530 onAlarmUpdate(alarmUpdateMsg);
600 531 }
601 532 }
  533 + if (uplinkMsg.getRuleChainMetadataRequestMsgList() != null && !uplinkMsg.getRuleChainMetadataRequestMsgList().isEmpty()) {
  534 + for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) {
  535 + ctx.getInitEdgeService().initRuleChainMetadata(edge, ruleChainMetadataRequestMsg, outputStream);
  536 + }
  537 + }
602 538 } catch (Exception e) {
603 539 return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build();
604 540 }
... ...
application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmUpdateMsgConstructor.java renamed from application/src/main/java/org/thingsboard/server/service/edge/rpc/alarm/AlarmMetadataConstructor.java
1   -package org.thingsboard.server.service.edge.rpc.alarm;
  1 +package org.thingsboard.server.service.edge.rpc.constructor;
2 2
3 3 import lombok.extern.slf4j.Slf4j;
4 4 import org.bouncycastle.jcajce.provider.symmetric.DES;
... ... @@ -19,7 +19,7 @@ import org.thingsboard.server.gen.edge.UpdateMsgType;
19 19
20 20 @Component
21 21 @Slf4j
22   -public class AlarmMetadataConstructor {
  22 +public class AlarmUpdateMsgConstructor {
23 23
24 24 @Autowired
25 25 private DeviceService deviceService;
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.data.asset.Asset;
  21 +import org.thingsboard.server.gen.edge.AssetUpdateMsg;
  22 +import org.thingsboard.server.gen.edge.UpdateMsgType;
  23 +
  24 +@Component
  25 +@Slf4j
  26 +public class AssetUpdateMsgConstructor {
  27 +
  28 + public AssetUpdateMsg constructAssetUpdatedMsg(UpdateMsgType msgType, Asset asset) {
  29 + AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder()
  30 + .setMsgType(msgType)
  31 + .setName(asset.getName())
  32 + .setType(asset.getType());
  33 + return builder.build();
  34 + }
  35 +
  36 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.data.Dashboard;
  22 +import org.thingsboard.server.dao.dashboard.DashboardService;
  23 +import org.thingsboard.server.dao.util.mapping.JacksonUtil;
  24 +import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
  25 +import org.thingsboard.server.gen.edge.UpdateMsgType;
  26 +
  27 +@Component
  28 +@Slf4j
  29 +public class DashboardUpdateMsgConstructor {
  30 +
  31 + @Autowired
  32 + private DashboardService dashboardService;
  33 +
  34 + public DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) {
  35 + dashboard = dashboardService.findDashboardById(dashboard.getTenantId(), dashboard.getId());
  36 + DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder()
  37 + .setMsgType(msgType)
  38 + .setIdMSB(dashboard.getId().getId().getMostSignificantBits())
  39 + .setIdLSB(dashboard.getId().getId().getLeastSignificantBits())
  40 + .setTitle(dashboard.getTitle())
  41 + .setConfiguration(JacksonUtil.toString(dashboard.getConfiguration()));
  42 + return builder.build();
  43 + }
  44 +
  45 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.data.Device;
  21 +import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
  22 +import org.thingsboard.server.gen.edge.UpdateMsgType;
  23 +
  24 +@Component
  25 +@Slf4j
  26 +public class DeviceUpdateMsgConstructor {
  27 +
  28 + public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
  29 + DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
  30 + .setMsgType(msgType)
  31 + .setName(device.getName())
  32 + .setType(device.getType());
  33 + return builder.build();
  34 + }
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.beans.factory.annotation.Autowired;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.data.Device;
  22 +import org.thingsboard.server.common.data.EntityType;
  23 +import org.thingsboard.server.common.data.EntityView;
  24 +import org.thingsboard.server.common.data.asset.Asset;
  25 +import org.thingsboard.server.common.data.id.AssetId;
  26 +import org.thingsboard.server.common.data.id.DeviceId;
  27 +import org.thingsboard.server.dao.asset.AssetService;
  28 +import org.thingsboard.server.dao.device.DeviceService;
  29 +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
  30 +import org.thingsboard.server.gen.edge.UpdateMsgType;
  31 +
  32 +@Component
  33 +@Slf4j
  34 +public class EntityViewUpdateMsgConstructor {
  35 +
  36 + @Autowired
  37 + private DeviceService deviceService;
  38 +
  39 + @Autowired
  40 + private AssetService assetService;
  41 +
  42 + public EntityViewUpdateMsg constructEntityViewUpdatedMsg(UpdateMsgType msgType, EntityView entityView) {
  43 + String relatedName;
  44 + String relatedType;
  45 + org.thingsboard.server.gen.edge.EntityType relatedEntityType;
  46 + if (entityView.getEntityId().getEntityType().equals(EntityType.DEVICE)) {
  47 + Device device = deviceService.findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId()));
  48 + relatedName = device.getName();
  49 + relatedType = device.getType();
  50 + relatedEntityType = org.thingsboard.server.gen.edge.EntityType.DEVICE;
  51 + } else {
  52 + Asset asset = assetService.findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId()));
  53 + relatedName = asset.getName();
  54 + relatedType = asset.getType();
  55 + relatedEntityType = org.thingsboard.server.gen.edge.EntityType.ASSET;
  56 + }
  57 + EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder()
  58 + .setMsgType(msgType)
  59 + .setName(entityView.getName())
  60 + .setType(entityView.getType())
  61 + .setRelatedName(relatedName)
  62 + .setRelatedType(relatedType)
  63 + .setRelatedEntityType(relatedEntityType);
  64 + return builder.build();
  65 + }
  66 +
  67 +}
... ...
application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RuleChainUpdateMsgConstructor.java renamed from application/src/main/java/org/thingsboard/server/service/edge/rpc/ruleChain/RuleChainMetadataConstructor.java
1   -package org.thingsboard.server.service.edge.rpc.ruleChain;
  1 +package org.thingsboard.server.service.edge.rpc.constructor;
2 2
3 3 import com.fasterxml.jackson.core.JsonProcessingException;
4 4 import com.fasterxml.jackson.databind.ObjectMapper;
... ... @@ -23,7 +23,7 @@ import java.util.List;
23 23
24 24 @Component
25 25 @Slf4j
26   -public class RuleChainMetadataConstructor {
  26 +public class RuleChainUpdateMsgConstructor {
27 27
28 28 private static final ObjectMapper objectMapper = new ObjectMapper();
29 29
... ... @@ -111,7 +111,6 @@ public class RuleChainMetadataConstructor {
111 111 }
112 112
113 113
114   -
115 114 private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
116 115 return RuleNodeProto.newBuilder()
117 116 .setIdMSB(node.getId().getId().getMostSignificantBits())
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.edge.rpc.init;
2 17
3 18 import io.grpc.stub.StreamObserver;
4 19 import lombok.extern.slf4j.Slf4j;
5 20 import org.springframework.beans.factory.annotation.Autowired;
6 21 import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.common.data.Dashboard;
  23 +import org.thingsboard.server.common.data.DashboardInfo;
  24 +import org.thingsboard.server.common.data.Device;
  25 +import org.thingsboard.server.common.data.EntityView;
  26 +import org.thingsboard.server.common.data.asset.Asset;
7 27 import org.thingsboard.server.common.data.edge.Edge;
  28 +import org.thingsboard.server.common.data.id.RuleChainId;
  29 +import org.thingsboard.server.common.data.page.TextPageData;
  30 +import org.thingsboard.server.common.data.page.TextPageLink;
8 31 import org.thingsboard.server.common.data.page.TimePageData;
9 32 import org.thingsboard.server.common.data.page.TimePageLink;
10 33 import org.thingsboard.server.common.data.rule.RuleChain;
11 34 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
  35 +import org.thingsboard.server.dao.asset.AssetService;
  36 +import org.thingsboard.server.dao.dashboard.DashboardService;
  37 +import org.thingsboard.server.dao.device.DeviceService;
  38 +import org.thingsboard.server.dao.entityview.EntityViewService;
12 39 import org.thingsboard.server.dao.rule.RuleChainService;
  40 +import org.thingsboard.server.gen.edge.AssetUpdateMsg;
  41 +import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
  42 +import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
13 43 import org.thingsboard.server.gen.edge.EntityUpdateMsg;
  44 +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
14 45 import org.thingsboard.server.gen.edge.ResponseMsg;
  46 +import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
15 47 import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
16 48 import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
17 49 import org.thingsboard.server.gen.edge.UpdateMsgType;
18   -import org.thingsboard.server.service.edge.rpc.ruleChain.RuleChainMetadataConstructor;
  50 +import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
  51 +import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
  52 +import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
  53 +import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
  54 +import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
  55 +
  56 +import java.util.UUID;
19 57
20 58 @Service
21 59 @Slf4j
... ... @@ -25,11 +63,160 @@ public class DefaultInitEdgeService implements InitEdgeService {
25 63 private RuleChainService ruleChainService;
26 64
27 65 @Autowired
28   - private RuleChainMetadataConstructor ruleChainMetadataConstructor;
  66 + private DeviceService deviceService;
  67 +
  68 + @Autowired
  69 + private AssetService assetService;
  70 +
  71 + @Autowired
  72 + private EntityViewService entityViewService;
  73 +
  74 + @Autowired
  75 + private DashboardService dashboardService;
  76 +
  77 + @Autowired
  78 + private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor;
  79 +
  80 + @Autowired
  81 + private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor;
  82 +
  83 + @Autowired
  84 + private AssetUpdateMsgConstructor assetUpdateMsgConstructor;
  85 +
  86 + @Autowired
  87 + private EntityViewUpdateMsgConstructor entityViewUpdateMsgConstructor;
  88 +
  89 + @Autowired
  90 + private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
29 91
30 92 @Override
31 93 public void init(Edge edge, StreamObserver<ResponseMsg> outputStream) {
32 94 initRuleChains(edge, outputStream);
  95 + initDevices(edge, outputStream);
  96 + initAssets(edge, outputStream);
  97 + initEntityViews(edge, outputStream);
  98 + initDashboards(edge, outputStream);
  99 + }
  100 +
  101 + private void initDevices(Edge edge, StreamObserver<ResponseMsg> outputStream) {
  102 + try {
  103 + TextPageLink pageLink = new TextPageLink(100);
  104 + TextPageData<Device> pageData;
  105 + do {
  106 + pageData = deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
  107 + if (!pageData.getData().isEmpty()) {
  108 + log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  109 + for (Device device : pageData.getData()) {
  110 + DeviceUpdateMsg deviceUpdateMsg =
  111 + deviceUpdateMsgConstructor.constructDeviceUpdatedMsg(
  112 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  113 + device);
  114 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  115 + .setDeviceUpdateMsg(deviceUpdateMsg)
  116 + .build();
  117 + outputStream.onNext(ResponseMsg.newBuilder()
  118 + .setEntityUpdateMsg(entityUpdateMsg)
  119 + .build());
  120 + }
  121 + }
  122 + if (pageData.hasNext()) {
  123 + pageLink = pageData.getNextPageLink();
  124 + }
  125 + } while (pageData.hasNext());
  126 + } catch (Exception e) {
  127 + log.error("Exception during loading edge device(s) on init!");
  128 + }
  129 + }
  130 +
  131 + private void initAssets(Edge edge, StreamObserver<ResponseMsg> outputStream) {
  132 + try {
  133 + TextPageLink pageLink = new TextPageLink(100);
  134 + TextPageData<Asset> pageData;
  135 + do {
  136 + pageData = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
  137 + if (!pageData.getData().isEmpty()) {
  138 + log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  139 + for (Asset asset : pageData.getData()) {
  140 + AssetUpdateMsg assetUpdateMsg =
  141 + assetUpdateMsgConstructor.constructAssetUpdatedMsg(
  142 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  143 + asset);
  144 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  145 + .setAssetUpdateMsg(assetUpdateMsg)
  146 + .build();
  147 + outputStream.onNext(ResponseMsg.newBuilder()
  148 + .setEntityUpdateMsg(entityUpdateMsg)
  149 + .build());
  150 + }
  151 + }
  152 + if (pageData.hasNext()) {
  153 + pageLink = pageData.getNextPageLink();
  154 + }
  155 + } while (pageData.hasNext());
  156 + } catch (Exception e) {
  157 + log.error("Exception during loading edge asset(s) on init!");
  158 + }
  159 + }
  160 +
  161 + private void initEntityViews(Edge edge, StreamObserver<ResponseMsg> outputStream) {
  162 + try {
  163 + TextPageLink pageLink = new TextPageLink(100);
  164 + TextPageData<EntityView> pageData;
  165 + do {
  166 + pageData = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
  167 + if (!pageData.getData().isEmpty()) {
  168 + log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  169 + for (EntityView entityView : pageData.getData()) {
  170 + EntityViewUpdateMsg entityViewUpdateMsg =
  171 + entityViewUpdateMsgConstructor.constructEntityViewUpdatedMsg(
  172 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  173 + entityView);
  174 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  175 + .setEntityViewUpdateMsg(entityViewUpdateMsg)
  176 + .build();
  177 + outputStream.onNext(ResponseMsg.newBuilder()
  178 + .setEntityUpdateMsg(entityUpdateMsg)
  179 + .build());
  180 + }
  181 + }
  182 + if (pageData.hasNext()) {
  183 + pageLink = pageData.getNextPageLink();
  184 + }
  185 + } while (pageData.hasNext());
  186 + } catch (Exception e) {
  187 + log.error("Exception during loading edge entity view(s) on init!");
  188 + }
  189 + }
  190 +
  191 + private void initDashboards(Edge edge, StreamObserver<ResponseMsg> outputStream) {
  192 + try {
  193 + TimePageLink pageLink = new TimePageLink(100);
  194 + TimePageData<DashboardInfo> pageData;
  195 + do {
  196 + pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get();
  197 + if (!pageData.getData().isEmpty()) {
  198 + log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
  199 + for (DashboardInfo dashboardInfo : pageData.getData()) {
  200 + Dashboard dashboard = dashboardService.findDashboardById(edge.getTenantId(), dashboardInfo.getId());
  201 + DashboardUpdateMsg dashboardUpdateMsg =
  202 + dashboardUpdateMsgConstructor.constructDashboardUpdatedMsg(
  203 + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
  204 + dashboard);
  205 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  206 + .setDashboardUpdateMsg(dashboardUpdateMsg)
  207 + .build();
  208 + outputStream.onNext(ResponseMsg.newBuilder()
  209 + .setEntityUpdateMsg(entityUpdateMsg)
  210 + .build());
  211 + }
  212 + }
  213 + if (pageData.hasNext()) {
  214 + pageLink = pageData.getNextPageLink();
  215 + }
  216 + } while (pageData.hasNext());
  217 + } catch (Exception e) {
  218 + log.error("Exception during loading edge dashboard(s) on init!");
  219 + }
33 220 }
34 221
35 222 private void initRuleChains(Edge edge, StreamObserver<ResponseMsg> outputStream) {
... ... @@ -42,7 +229,7 @@ public class DefaultInitEdgeService implements InitEdgeService {
42 229 log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
43 230 for (RuleChain ruleChain : pageData.getData()) {
44 231 RuleChainUpdateMsg ruleChainUpdateMsg =
45   - ruleChainMetadataConstructor.constructRuleChainUpdatedMsg(
  232 + ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg(
46 233 edge,
47 234 UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
48 235 ruleChain);
... ... @@ -52,20 +239,6 @@ public class DefaultInitEdgeService implements InitEdgeService {
52 239 outputStream.onNext(ResponseMsg.newBuilder()
53 240 .setEntityUpdateMsg(entityUpdateMsg)
54 241 .build());
55   -
56   - RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChain.getId());
57   - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
58   - ruleChainMetadataConstructor.constructRuleChainMetadataUpdatedMsg(
59   - UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
60   - ruleChainMetaData);
61   - if (ruleChainMetadataUpdateMsg != null) {
62   - entityUpdateMsg = EntityUpdateMsg.newBuilder()
63   - .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
64   - .build();
65   - outputStream.onNext(ResponseMsg.newBuilder()
66   - .setEntityUpdateMsg(entityUpdateMsg)
67   - .build());
68   - }
69 242 }
70 243 }
71 244 if (pageData.hasNext()) {
... ... @@ -73,8 +246,27 @@ public class DefaultInitEdgeService implements InitEdgeService {
73 246 }
74 247 } while (pageData.hasNext());
75 248 } catch (Exception e) {
76   - log.error("Exception during loading edge rule chains on init!");
  249 + log.error("Exception during loading edge rule chain(s) on init!");
77 250 }
  251 + }
78 252
  253 + @Override
  254 + public void initRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) {
  255 + if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
  256 + RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
  257 + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChainId);
  258 + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
  259 + ruleChainUpdateMsgConstructor.constructRuleChainMetadataUpdatedMsg(
  260 + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
  261 + ruleChainMetaData);
  262 + if (ruleChainMetadataUpdateMsg != null) {
  263 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  264 + .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
  265 + .build();
  266 + outputStream.onNext(ResponseMsg.newBuilder()
  267 + .setEntityUpdateMsg(entityUpdateMsg)
  268 + .build());
  269 + }
  270 + }
79 271 }
80 272 }
... ...
  1 +/**
  2 + * Copyright © 2016-2019 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.service.edge.rpc.init;
2 17
3 18 import io.grpc.stub.StreamObserver;
4 19 import org.thingsboard.server.common.data.edge.Edge;
5 20 import org.thingsboard.server.gen.edge.ResponseMsg;
  21 +import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
6 22
7 23 public interface InitEdgeService {
8 24
9 25 void init(Edge edge, StreamObserver<ResponseMsg> outputStream);
  26 +
  27 + void initRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream);
10 28 }
... ...