Commit 422ef7564e87b93dd6e38724f1bcd81404706360
Committed by
GitHub
Merge pull request #36 from BohdanSmetanyuk/feature/propagate_relations
propagate delete/update action for relations
Showing
2 changed files
with
69 additions
and
5 deletions
@@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.asset.Asset; | @@ -47,6 +47,8 @@ import org.thingsboard.server.common.data.asset.Asset; | ||
47 | import org.thingsboard.server.common.data.audit.ActionType; | 47 | import org.thingsboard.server.common.data.audit.ActionType; |
48 | import org.thingsboard.server.common.data.edge.Edge; | 48 | import org.thingsboard.server.common.data.edge.Edge; |
49 | import org.thingsboard.server.common.data.edge.EdgeEvent; | 49 | import org.thingsboard.server.common.data.edge.EdgeEvent; |
50 | +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; | ||
51 | +import org.thingsboard.server.common.data.exception.ThingsboardException; | ||
50 | import org.thingsboard.server.common.data.id.AlarmId; | 52 | import org.thingsboard.server.common.data.id.AlarmId; |
51 | import org.thingsboard.server.common.data.id.AssetId; | 53 | import org.thingsboard.server.common.data.id.AssetId; |
52 | import org.thingsboard.server.common.data.id.CustomerId; | 54 | import org.thingsboard.server.common.data.id.CustomerId; |
@@ -54,6 +56,7 @@ import org.thingsboard.server.common.data.id.DashboardId; | @@ -54,6 +56,7 @@ import org.thingsboard.server.common.data.id.DashboardId; | ||
54 | import org.thingsboard.server.common.data.id.DeviceId; | 56 | import org.thingsboard.server.common.data.id.DeviceId; |
55 | import org.thingsboard.server.common.data.id.EdgeId; | 57 | import org.thingsboard.server.common.data.id.EdgeId; |
56 | import org.thingsboard.server.common.data.id.EntityId; | 58 | import org.thingsboard.server.common.data.id.EntityId; |
59 | +import org.thingsboard.server.common.data.id.EntityIdFactory; | ||
57 | import org.thingsboard.server.common.data.id.EntityViewId; | 60 | import org.thingsboard.server.common.data.id.EntityViewId; |
58 | import org.thingsboard.server.common.data.id.RuleChainId; | 61 | import org.thingsboard.server.common.data.id.RuleChainId; |
59 | import org.thingsboard.server.common.data.id.TenantId; | 62 | import org.thingsboard.server.common.data.id.TenantId; |
@@ -98,6 +101,7 @@ import org.thingsboard.server.gen.edge.EntityDataProto; | @@ -98,6 +101,7 @@ import org.thingsboard.server.gen.edge.EntityDataProto; | ||
98 | import org.thingsboard.server.gen.edge.EntityUpdateMsg; | 101 | import org.thingsboard.server.gen.edge.EntityUpdateMsg; |
99 | import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; | 102 | import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; |
100 | import org.thingsboard.server.gen.edge.RelationRequestMsg; | 103 | import org.thingsboard.server.gen.edge.RelationRequestMsg; |
104 | +import org.thingsboard.server.gen.edge.RelationUpdateMsg; | ||
101 | import org.thingsboard.server.gen.edge.RequestMsg; | 105 | import org.thingsboard.server.gen.edge.RequestMsg; |
102 | import org.thingsboard.server.gen.edge.RequestMsgType; | 106 | import org.thingsboard.server.gen.edge.RequestMsgType; |
103 | import org.thingsboard.server.gen.edge.ResponseMsg; | 107 | import org.thingsboard.server.gen.edge.ResponseMsg; |
@@ -813,6 +817,11 @@ public final class EdgeGrpcSession implements Closeable { | @@ -813,6 +817,11 @@ public final class EdgeGrpcSession implements Closeable { | ||
813 | onAlarmUpdate(alarmUpdateMsg); | 817 | onAlarmUpdate(alarmUpdateMsg); |
814 | } | 818 | } |
815 | } | 819 | } |
820 | + if (uplinkMsg.getRelationUpdateMsgList() != null && !uplinkMsg.getRelationUpdateMsgList().isEmpty()) { | ||
821 | + for (RelationUpdateMsg relationUpdateMsg: uplinkMsg.getRelationUpdateMsgList()) { | ||
822 | + onRelationUpdate(relationUpdateMsg); | ||
823 | + } | ||
824 | + } | ||
816 | if (uplinkMsg.getRuleChainMetadataRequestMsgList() != null && !uplinkMsg.getRuleChainMetadataRequestMsgList().isEmpty()) { | 825 | if (uplinkMsg.getRuleChainMetadataRequestMsgList() != null && !uplinkMsg.getRuleChainMetadataRequestMsgList().isEmpty()) { |
817 | for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { | 826 | for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { |
818 | ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge, ruleChainMetadataRequestMsg); | 827 | ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge, ruleChainMetadataRequestMsg); |
@@ -1164,6 +1173,60 @@ public final class EdgeGrpcSession implements Closeable { | @@ -1164,6 +1173,60 @@ public final class EdgeGrpcSession implements Closeable { | ||
1164 | } | 1173 | } |
1165 | } | 1174 | } |
1166 | 1175 | ||
1176 | + private void onRelationUpdate(RelationUpdateMsg relationUpdateMsg) { | ||
1177 | + log.info("onRelationUpdate {}", relationUpdateMsg); | ||
1178 | + try { | ||
1179 | + EntityRelation entityRelation = new EntityRelation(); | ||
1180 | + | ||
1181 | + UUID fromUUID = new UUID(relationUpdateMsg.getFromIdMSB(), relationUpdateMsg.getFromIdLSB()); | ||
1182 | + EntityId fromId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getFromEntityType()), fromUUID); | ||
1183 | + entityRelation.setFrom(fromId); | ||
1184 | + | ||
1185 | + UUID toUUID = new UUID(relationUpdateMsg.getToIdMSB(), relationUpdateMsg.getToIdLSB()); | ||
1186 | + EntityId toId = EntityIdFactory.getByTypeAndUuid(EntityType.valueOf(relationUpdateMsg.getToEntityType()), toUUID); | ||
1187 | + entityRelation.setTo(toId); | ||
1188 | + | ||
1189 | + entityRelation.setType(relationUpdateMsg.getType()); | ||
1190 | + entityRelation.setTypeGroup(RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup())); | ||
1191 | + entityRelation.setAdditionalInfo(mapper.readTree(relationUpdateMsg.getAdditionalInfo())); | ||
1192 | + switch (relationUpdateMsg.getMsgType()) { | ||
1193 | + case ENTITY_CREATED_RPC_MESSAGE: | ||
1194 | + case ENTITY_UPDATED_RPC_MESSAGE: | ||
1195 | + if (isEntityExists(edge.getTenantId(), entityRelation.getTo()) | ||
1196 | + && isEntityExists(edge.getTenantId(), entityRelation.getFrom())) { | ||
1197 | + ctx.getRelationService().saveRelationAsync(edge.getTenantId(), entityRelation); | ||
1198 | + } | ||
1199 | + break; | ||
1200 | + case ENTITY_DELETED_RPC_MESSAGE: | ||
1201 | + ctx.getRelationService().deleteRelation(edge.getTenantId(), entityRelation); | ||
1202 | + break; | ||
1203 | + case UNRECOGNIZED: | ||
1204 | + log.error("Unsupported msg type"); | ||
1205 | + } | ||
1206 | + } catch (Exception e) { | ||
1207 | + log.error("Error during relation update msg", e); | ||
1208 | + } | ||
1209 | + } | ||
1210 | + | ||
1211 | + private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException { | ||
1212 | + switch (entityId.getEntityType()) { | ||
1213 | + case DEVICE: | ||
1214 | + return ctx.getDeviceService().findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; | ||
1215 | + case ASSET: | ||
1216 | + return ctx.getAssetService().findAssetById(tenantId, new AssetId(entityId.getId())) != null; | ||
1217 | + case ENTITY_VIEW: | ||
1218 | + return ctx.getEntityViewService().findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; | ||
1219 | + case CUSTOMER: | ||
1220 | + return ctx.getCustomerService().findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; | ||
1221 | + case USER: | ||
1222 | + return ctx.getUserService().findUserById(tenantId, new UserId(entityId.getId())) != null; | ||
1223 | + case DASHBOARD: | ||
1224 | + return ctx.getDashboardService().findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; | ||
1225 | + default: | ||
1226 | + throw new ThingsboardException("Unsupported entity type " + entityId.getEntityType(), ThingsboardErrorCode.INVALID_ARGUMENTS); | ||
1227 | + } | ||
1228 | + } | ||
1229 | + | ||
1167 | private ConnectResponseMsg processConnect(ConnectRequestMsg request) { | 1230 | private ConnectResponseMsg processConnect(ConnectRequestMsg request) { |
1168 | Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey()); | 1231 | Optional<Edge> optional = ctx.getEdgeService().findEdgeByRoutingKey(TenantId.SYS_TENANT_ID, request.getEdgeRoutingKey()); |
1169 | if (optional.isPresent()) { | 1232 | if (optional.isPresent()) { |
@@ -348,11 +348,12 @@ message UplinkMsg { | @@ -348,11 +348,12 @@ message UplinkMsg { | ||
348 | repeated DeviceUpdateMsg deviceUpdateMsg = 3; | 348 | repeated DeviceUpdateMsg deviceUpdateMsg = 3; |
349 | repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 4; | 349 | repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 4; |
350 | repeated AlarmUpdateMsg alarmUpdateMsg = 5; | 350 | repeated AlarmUpdateMsg alarmUpdateMsg = 5; |
351 | - repeated RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg = 6; | ||
352 | - repeated AttributesRequestMsg attributesRequestMsg = 7; | ||
353 | - repeated RelationRequestMsg relationRequestMsg = 8; | ||
354 | - repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 9; | ||
355 | - repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 10; | 351 | + repeated RelationUpdateMsg relationUpdateMsg = 6; |
352 | + repeated RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg = 7; | ||
353 | + repeated AttributesRequestMsg attributesRequestMsg = 8; | ||
354 | + repeated RelationRequestMsg relationRequestMsg = 9; | ||
355 | + repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10; | ||
356 | + repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11; | ||
356 | } | 357 | } |
357 | 358 | ||
358 | message UplinkResponseMsg { | 359 | message UplinkResponseMsg { |