Commit 492a14221b40aafc8af0523b3c0b6e304dcb775c

Authored by Volodymyr Babak
1 parent 1a83dbc8

Handling device creation from edge to cloud

@@ -25,6 +25,7 @@ import org.thingsboard.server.dao.asset.AssetService; @@ -25,6 +25,7 @@ import org.thingsboard.server.dao.asset.AssetService;
25 import org.thingsboard.server.dao.attributes.AttributesService; 25 import org.thingsboard.server.dao.attributes.AttributesService;
26 import org.thingsboard.server.dao.customer.CustomerService; 26 import org.thingsboard.server.dao.customer.CustomerService;
27 import org.thingsboard.server.dao.dashboard.DashboardService; 27 import org.thingsboard.server.dao.dashboard.DashboardService;
  28 +import org.thingsboard.server.dao.device.DeviceCredentialsService;
28 import org.thingsboard.server.dao.device.DeviceService; 29 import org.thingsboard.server.dao.device.DeviceService;
29 import org.thingsboard.server.dao.edge.EdgeService; 30 import org.thingsboard.server.dao.edge.EdgeService;
30 import org.thingsboard.server.dao.entityview.EntityViewService; 31 import org.thingsboard.server.dao.entityview.EntityViewService;
@@ -59,6 +60,10 @@ public class EdgeContextComponent { @@ -59,6 +60,10 @@ public class EdgeContextComponent {
59 60
60 @Lazy 61 @Lazy
61 @Autowired 62 @Autowired
  63 + private DeviceCredentialsService deviceCredentialsService;
  64 +
  65 + @Lazy
  66 + @Autowired
62 private EntityViewService entityViewService; 67 private EntityViewService entityViewService;
63 68
64 @Lazy 69 @Lazy
@@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Customer; @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Customer;
31 import org.thingsboard.server.common.data.Dashboard; 31 import org.thingsboard.server.common.data.Dashboard;
32 import org.thingsboard.server.common.data.DataConstants; 32 import org.thingsboard.server.common.data.DataConstants;
33 import org.thingsboard.server.common.data.Device; 33 import org.thingsboard.server.common.data.Device;
  34 +import org.thingsboard.server.common.data.EntityType;
34 import org.thingsboard.server.common.data.EntityView; 35 import org.thingsboard.server.common.data.EntityView;
35 import org.thingsboard.server.common.data.Event; 36 import org.thingsboard.server.common.data.Event;
36 import org.thingsboard.server.common.data.User; 37 import org.thingsboard.server.common.data.User;
@@ -56,6 +57,8 @@ import org.thingsboard.server.common.data.relation.EntityRelation; @@ -56,6 +57,8 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
56 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 57 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
57 import org.thingsboard.server.common.data.rule.RuleChain; 58 import org.thingsboard.server.common.data.rule.RuleChain;
58 import org.thingsboard.server.common.data.rule.RuleChainMetaData; 59 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
  60 +import org.thingsboard.server.common.data.security.DeviceCredentials;
  61 +import org.thingsboard.server.common.data.security.DeviceCredentialsType;
59 import org.thingsboard.server.common.msg.TbMsg; 62 import org.thingsboard.server.common.msg.TbMsg;
60 import org.thingsboard.server.common.msg.TbMsgDataType; 63 import org.thingsboard.server.common.msg.TbMsgDataType;
61 import org.thingsboard.server.common.msg.TbMsgMetaData; 64 import org.thingsboard.server.common.msg.TbMsgMetaData;
@@ -99,7 +102,7 @@ import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_CREATED_RPC_M @@ -99,7 +102,7 @@ import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_CREATED_RPC_M
99 @Data 102 @Data
100 public final class EdgeGrpcSession implements Closeable { 103 public final class EdgeGrpcSession implements Closeable {
101 104
102 - private static final ReentrantLock entityCreationLock = new ReentrantLock(); 105 + private static final ReentrantLock deviceCreationLock = new ReentrantLock();
103 106
104 private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; 107 private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
105 108
@@ -518,32 +521,15 @@ public final class EdgeGrpcSession implements Closeable { @@ -518,32 +521,15 @@ public final class EdgeGrpcSession implements Closeable {
518 for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { 521 for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
519 TbMsg tbMsg = null; 522 TbMsg tbMsg = null;
520 TbMsg originalTbMsg = TbMsg.fromBytes(entityData.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); 523 TbMsg originalTbMsg = TbMsg.fromBytes(entityData.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
521 - switch (originalTbMsg.getOriginator().getEntityType()) {  
522 - case DEVICE:  
523 - String deviceName = entityData.getEntityName();  
524 - String deviceType = entityData.getEntityType();  
525 - Device device = getOrCreateDevice(deviceName, deviceType);  
526 - if (device != null) {  
527 - tbMsg = TbMsg.newMsg(originalTbMsg.getType(), device.getId(), originalTbMsg.getMetaData().copy(),  
528 - originalTbMsg.getDataType(), originalTbMsg.getData());  
529 - }  
530 - break;  
531 - case ASSET:  
532 - String assetName = entityData.getEntityName();  
533 - Asset asset = ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), assetName);  
534 - if (asset != null) {  
535 - tbMsg = TbMsg.newMsg(originalTbMsg.getType(), asset.getId(), originalTbMsg.getMetaData().copy(),  
536 - originalTbMsg.getDataType(), originalTbMsg.getData());  
537 - }  
538 - break;  
539 - case ENTITY_VIEW:  
540 - String entityViewName = entityData.getEntityName();  
541 - EntityView entityView = ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityViewName);  
542 - if (entityView != null) {  
543 - tbMsg = TbMsg.newMsg(originalTbMsg.getType(), entityView.getId(), originalTbMsg.getMetaData().copy(),  
544 - originalTbMsg.getDataType(), originalTbMsg.getData());  
545 - }  
546 - break; 524 + if (originalTbMsg.getOriginator().getEntityType() == EntityType.DEVICE) {
  525 + String deviceName = entityData.getEntityName();
  526 + Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
  527 + if (device != null) {
  528 + tbMsg = TbMsg.newMsg(originalTbMsg.getType(), device.getId(), originalTbMsg.getMetaData().copy(),
  529 + originalTbMsg.getDataType(), originalTbMsg.getData());
  530 + }
  531 + } else {
  532 + tbMsg = originalTbMsg;
547 } 533 }
548 if (tbMsg != null) { 534 if (tbMsg != null) {
549 ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); 535 ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null);
@@ -552,19 +538,7 @@ public final class EdgeGrpcSession implements Closeable { @@ -552,19 +538,7 @@ public final class EdgeGrpcSession implements Closeable {
552 } 538 }
553 if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) { 539 if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) {
554 for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { 540 for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
555 - String deviceName = deviceUpdateMsg.getName();  
556 - String deviceType = deviceUpdateMsg.getType();  
557 - switch (deviceUpdateMsg.getMsgType()) {  
558 - case ENTITY_CREATED_RPC_MESSAGE:  
559 - getOrCreateDevice(deviceName, deviceType);  
560 - break;  
561 - case ENTITY_DELETED_RPC_MESSAGE:  
562 - Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);  
563 - if (device != null) {  
564 - ctx.getDeviceService().unassignDeviceFromEdge(edge.getTenantId(), device.getId(), edge.getId());  
565 - }  
566 - break;  
567 - } 541 + onDeviceUpdate(deviceUpdateMsg);
568 } 542 }
569 } 543 }
570 if (uplinkMsg.getAlarmUpdateMsgList() != null && !uplinkMsg.getAlarmUpdateMsgList().isEmpty()) { 544 if (uplinkMsg.getAlarmUpdateMsgList() != null && !uplinkMsg.getAlarmUpdateMsgList().isEmpty()) {
@@ -584,6 +558,101 @@ public final class EdgeGrpcSession implements Closeable { @@ -584,6 +558,101 @@ public final class EdgeGrpcSession implements Closeable {
584 return UplinkResponseMsg.newBuilder().setSuccess(true).build(); 558 return UplinkResponseMsg.newBuilder().setSuccess(true).build();
585 } 559 }
586 560
  561 + private void onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) {
  562 + log.info("onDeviceUpdate {}", deviceUpdateMsg);
  563 + DeviceId edgeDeviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
  564 + switch (deviceUpdateMsg.getMsgType()) {
  565 + case ENTITY_CREATED_RPC_MESSAGE:
  566 + String deviceName = deviceUpdateMsg.getName();
  567 + Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
  568 + if (device != null) {
  569 + // device with this name already exists on the cloud - update ID on the edge
  570 + if (!device.getId().equals(edgeDeviceId)) {
  571 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  572 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device))
  573 + .build();
  574 + outputStream.onNext(ResponseMsg.newBuilder()
  575 + .setEntityUpdateMsg(entityUpdateMsg)
  576 + .build());
  577 + }
  578 + } else {
  579 + Device deviceById = ctx.getDeviceService().findDeviceById(edge.getTenantId(), edgeDeviceId);
  580 + if (deviceById != null) {
  581 + // this ID already used by other device - create new device and update ID on the edge
  582 + Device savedDevice = createDevice(deviceUpdateMsg);
  583 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  584 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, savedDevice))
  585 + .build();
  586 + outputStream.onNext(ResponseMsg.newBuilder()
  587 + .setEntityUpdateMsg(entityUpdateMsg)
  588 + .build());
  589 + } else {
  590 + createDevice(deviceUpdateMsg);
  591 + }
  592 + }
  593 + break;
  594 + case ENTITY_UPDATED_RPC_MESSAGE:
  595 + updateDevice(deviceUpdateMsg);
  596 + break;
  597 + case ENTITY_DELETED_RPC_MESSAGE:
  598 + Device deviceToDelete = ctx.getDeviceService().findDeviceById(edge.getTenantId(), edgeDeviceId);
  599 + if (deviceToDelete != null) {
  600 + ctx.getDeviceService().unassignDeviceFromEdge(edge.getTenantId(), edgeDeviceId, edge.getId());
  601 + }
  602 + break;
  603 + case UNRECOGNIZED:
  604 + log.error("Unsupported msg type");
  605 + }
  606 + }
  607 +
  608 + private void updateDevice(DeviceUpdateMsg deviceUpdateMsg) {
  609 + DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
  610 + Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), deviceId);
  611 + device.setName(deviceUpdateMsg.getName());
  612 + device.setType(deviceUpdateMsg.getType());
  613 + device.setLabel(deviceUpdateMsg.getLabel());
  614 + device = ctx.getDeviceService().saveDevice(device);
  615 + updateDeviceCredentials(deviceUpdateMsg, device);
  616 + }
  617 +
  618 + private void updateDeviceCredentials(DeviceUpdateMsg deviceUpdateMsg, Device device) {
  619 + log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
  620 + device.getName(), deviceUpdateMsg.getCredentialsId(), deviceUpdateMsg.getCredentialsValue());
  621 +
  622 + DeviceCredentials deviceCredentials = ctx.getDeviceCredentialsService().findDeviceCredentialsByDeviceId(edge.getTenantId(), device.getId());
  623 + deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceUpdateMsg.getCredentialsType()));
  624 + deviceCredentials.setCredentialsId(deviceUpdateMsg.getCredentialsId());
  625 + deviceCredentials.setCredentialsValue(deviceUpdateMsg.getCredentialsValue());
  626 + ctx.getDeviceCredentialsService().updateDeviceCredentials(edge.getTenantId(), deviceCredentials);
  627 + log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
  628 + device.getName(), deviceUpdateMsg.getCredentialsId(), deviceUpdateMsg.getCredentialsValue());
  629 +
  630 + }
  631 +
  632 + private Device createDevice(DeviceUpdateMsg deviceUpdateMsg) {
  633 + Device device;
  634 + try {
  635 + deviceCreationLock.lock();
  636 + DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
  637 + device = new Device();
  638 + device.setTenantId(edge.getTenantId());
  639 + device.setCustomerId(edge.getCustomerId());
  640 + device.setId(deviceId);
  641 + device.setName(deviceUpdateMsg.getName());
  642 + device.setType(deviceUpdateMsg.getType());
  643 + device.setLabel(deviceUpdateMsg.getLabel());
  644 + device = ctx.getDeviceService().saveDevice(device);
  645 + device = ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
  646 + createRelationFromEdge(device.getId());
  647 + ctx.getRelationService().saveRelationAsync(TenantId.SYS_TENANT_ID, new EntityRelation(edge.getId(), device.getId(), "Created"));
  648 + ctx.getDeviceStateService().onDeviceAdded(device);
  649 + updateDeviceCredentials(deviceUpdateMsg, device);
  650 + } finally {
  651 + deviceCreationLock.unlock();
  652 + }
  653 + return device;
  654 + }
  655 +
587 private EntityId getAlarmOriginator(String entityName, org.thingsboard.server.common.data.EntityType entityType) { 656 private EntityId getAlarmOriginator(String entityName, org.thingsboard.server.common.data.EntityType entityType) {
588 switch (entityType) { 657 switch (entityType) {
589 case DEVICE: 658 case DEVICE:
@@ -643,36 +712,6 @@ public final class EdgeGrpcSession implements Closeable { @@ -643,36 +712,6 @@ public final class EdgeGrpcSession implements Closeable {
643 } 712 }
644 } 713 }
645 714
646 - private Device getOrCreateDevice(String deviceName, String deviceType) {  
647 - Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);  
648 - if (device == null) {  
649 - entityCreationLock.lock();  
650 - try {  
651 - return processGetOrCreateDevice(deviceName, deviceType);  
652 - } finally {  
653 - entityCreationLock.unlock();  
654 - }  
655 - }  
656 - return device;  
657 - }  
658 -  
659 - private Device processGetOrCreateDevice(String deviceName, String deviceType) {  
660 - Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);  
661 - if (device == null) {  
662 - device = new Device();  
663 - device.setName(deviceName);  
664 - device.setType(deviceType);  
665 - device.setTenantId(edge.getTenantId());  
666 - device.setCustomerId(edge.getCustomerId());  
667 - device = ctx.getDeviceService().saveDevice(device);  
668 - device = ctx.getDeviceService().assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());  
669 - createRelationFromEdge(device.getId());  
670 - ctx.getRelationService().saveRelationAsync(TenantId.SYS_TENANT_ID, new EntityRelation(edge.getId(), device.getId(), "Created"));  
671 - ctx.getDeviceStateService().onDeviceAdded(device);  
672 - }  
673 - return device;  
674 - }  
675 -  
676 private ConnectResponseMsg processConnect(ConnectRequestMsg request) { 715 private ConnectResponseMsg processConnect(ConnectRequestMsg request) {
677 Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey()); 716 Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey());
678 if (optional.isPresent()) { 717 if (optional.isPresent()) {
@@ -94,14 +94,14 @@ enum UpdateMsgType { @@ -94,14 +94,14 @@ enum UpdateMsgType {
94 ALARM_ACK_RPC_MESSAGE = 3; 94 ALARM_ACK_RPC_MESSAGE = 3;
95 ALARM_CLEAR_RPC_MESSAGE = 4; 95 ALARM_CLEAR_RPC_MESSAGE = 4;
96 RULE_CHAIN_CUSTOM_MESSAGE = 5; 96 RULE_CHAIN_CUSTOM_MESSAGE = 5;
  97 + DEVICE_CONFLICT_RPC_MESSAGE = 6;
97 } 98 }
98 99
99 message EntityDataProto { 100 message EntityDataProto {
100 string entityName = 1; 101 string entityName = 1;
101 - string entityType = 2;  
102 - int64 entityIdMSB = 3;  
103 - int64 entityIdLSB = 4;  
104 - bytes tbMsg = 5; 102 + int64 entityIdMSB = 2;
  103 + int64 entityIdLSB = 3;
  104 + bytes tbMsg = 4;
105 } 105 }
106 106
107 message RuleChainUpdateMsg { 107 message RuleChainUpdateMsg {
@@ -156,7 +156,6 @@ message DashboardUpdateMsg { @@ -156,7 +156,6 @@ message DashboardUpdateMsg {
156 int64 idLSB = 3; 156 int64 idLSB = 3;
157 string title = 4; 157 string title = 4;
158 string configuration = 5; 158 string configuration = 5;
159 - string groupName = 6;  
160 } 159 }
161 160
162 message DeviceUpdateMsg { 161 message DeviceUpdateMsg {
@@ -169,7 +168,6 @@ message DeviceUpdateMsg { @@ -169,7 +168,6 @@ message DeviceUpdateMsg {
169 string credentialsType = 7; 168 string credentialsType = 7;
170 string credentialsId = 8; 169 string credentialsId = 8;
171 string credentialsValue = 9; 170 string credentialsValue = 9;
172 - string groupName = 10;  
173 } 171 }
174 172
175 message AssetUpdateMsg { 173 message AssetUpdateMsg {
@@ -179,7 +177,6 @@ message AssetUpdateMsg { @@ -179,7 +177,6 @@ message AssetUpdateMsg {
179 string name = 4; 177 string name = 4;
180 string type = 5; 178 string type = 5;
181 string label = 6; 179 string label = 6;
182 - string groupName = 7;  
183 } 180 }
184 181
185 message EntityViewUpdateMsg { 182 message EntityViewUpdateMsg {
@@ -191,7 +188,6 @@ message EntityViewUpdateMsg { @@ -191,7 +188,6 @@ message EntityViewUpdateMsg {
191 int64 entityIdMSB = 6; 188 int64 entityIdMSB = 6;
192 int64 entityIdLSB = 7; 189 int64 entityIdLSB = 7;
193 EdgeEntityType entityType = 8; 190 EdgeEntityType entityType = 8;
194 - string groupName = 9;  
195 } 191 }
196 192
197 message AlarmUpdateMsg { 193 message AlarmUpdateMsg {
@@ -237,7 +233,6 @@ message UserUpdateMsg { @@ -237,7 +233,6 @@ message UserUpdateMsg {
237 string additionalInfo = 8; 233 string additionalInfo = 8;
238 bool enabled = 9; 234 bool enabled = 9;
239 string password = 10; 235 string password = 10;
240 - string groupName = 11;  
241 } 236 }
242 237
243 message RuleChainMetadataRequestMsg { 238 message RuleChainMetadataRequestMsg {