Showing
8 changed files
with
72 additions
and
64 deletions
... | ... | @@ -224,6 +224,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
224 | 224 | if (targetCtx != null) { |
225 | 225 | log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId); |
226 | 226 | pushMsgToNode(targetCtx, msg, ""); |
227 | + pushUpdatesToEdges(msg); | |
227 | 228 | } else { |
228 | 229 | log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId); |
229 | 230 | msg.getCallback().onSuccess(); |
... | ... | @@ -346,7 +347,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh |
346 | 347 | |
347 | 348 | private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) { |
348 | 349 | if (nodeCtx != null) { |
349 | - pushUpdatesToEdges(msg); | |
350 | 350 | nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType), self); |
351 | 351 | } else { |
352 | 352 | log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName); | ... | ... |
... | ... | @@ -274,7 +274,7 @@ public final class EdgeGrpcSession implements Closeable { |
274 | 274 | , objectMapper.writeValueAsString(entityNode)); |
275 | 275 | log.debug("Sending donwlink entity data msg, entityName [{}], tbMsg [{}]", finalEntityName, tbMsg); |
276 | 276 | outputStream.onNext(ResponseMsg.newBuilder() |
277 | - .setDownlinkMsg(constructDownlinkEntityDataMsg(finalEntityName, tbMsg)) | |
277 | + .setDownlinkMsg(constructDownlinkEntityDataMsg(finalEntityName, finalEntityId, tbMsg)) | |
278 | 278 | .build()); |
279 | 279 | } catch (Exception e) { |
280 | 280 | log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e); |
... | ... | @@ -291,25 +291,29 @@ public final class EdgeGrpcSession implements Closeable { |
291 | 291 | log.trace("Executing processCustomDownlinkMessage, entry [{}]", entry); |
292 | 292 | TbMsg tbMsg = TbMsg.fromBytes(Base64.decodeBase64(entry.getData()), TbMsgCallback.EMPTY); |
293 | 293 | String entityName = null; |
294 | + EntityId entityId = null; | |
294 | 295 | switch (entry.getEntityType()) { |
295 | 296 | case DEVICE: |
296 | 297 | Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(tbMsg.getOriginator().getId())); |
297 | 298 | entityName = device.getName(); |
299 | + entityId = device.getId(); | |
298 | 300 | break; |
299 | 301 | case ASSET: |
300 | 302 | Asset asset = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(tbMsg.getOriginator().getId())); |
301 | 303 | entityName = asset.getName(); |
304 | + entityId = asset.getId(); | |
302 | 305 | break; |
303 | 306 | case ENTITY_VIEW: |
304 | 307 | EntityView entityView = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(tbMsg.getOriginator().getId())); |
305 | 308 | entityName = entityView.getName(); |
309 | + entityId = entityView.getId(); | |
306 | 310 | break; |
307 | 311 | |
308 | 312 | } |
309 | - if (entityName != null) { | |
310 | - log.debug("Sending donwlink entity data msg, entityName [{}], tbMsg [{}]", entityName, tbMsg); | |
313 | + if (entityName != null && entityId != null) { | |
314 | + log.debug("Sending downlink entity data msg, entityName [{}], tbMsg [{}]", entityName, tbMsg); | |
311 | 315 | outputStream.onNext(ResponseMsg.newBuilder() |
312 | - .setDownlinkMsg(constructDownlinkEntityDataMsg(entityName, tbMsg)) | |
316 | + .setDownlinkMsg(constructDownlinkEntityDataMsg(entityName, entityId, tbMsg)) | |
313 | 317 | .build()); |
314 | 318 | } |
315 | 319 | } |
... | ... | @@ -482,10 +486,13 @@ public final class EdgeGrpcSession implements Closeable { |
482 | 486 | } |
483 | 487 | } |
484 | 488 | |
485 | - private DownlinkMsg constructDownlinkEntityDataMsg(String entityName, TbMsg tbMsg) { | |
489 | + private DownlinkMsg constructDownlinkEntityDataMsg(String entityName, EntityId entityId, TbMsg tbMsg) { | |
486 | 490 | EntityDataProto entityData = EntityDataProto.newBuilder() |
487 | 491 | .setEntityName(entityName) |
488 | - .setTbMsg(ByteString.copyFrom(TbMsg.toByteArray(tbMsg))).build(); | |
492 | + .setTbMsg(ByteString.copyFrom(TbMsg.toByteArray(tbMsg))) | |
493 | + .setEntityIdMSB(entityId.getId().getMostSignificantBits()) | |
494 | + .setEntityIdLSB(entityId.getId().getLeastSignificantBits()) | |
495 | + .build(); | |
489 | 496 | |
490 | 497 | DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() |
491 | 498 | .addAllEntityData(Collections.singletonList(entityData)); | ... | ... |
... | ... | @@ -28,6 +28,8 @@ public class AssetUpdateMsgConstructor { |
28 | 28 | public AssetUpdateMsg constructAssetUpdatedMsg(UpdateMsgType msgType, Asset asset) { |
29 | 29 | AssetUpdateMsg.Builder builder = AssetUpdateMsg.newBuilder() |
30 | 30 | .setMsgType(msgType) |
31 | + .setIdMSB(asset.getId().getId().getMostSignificantBits()) | |
32 | + .setIdLSB(asset.getId().getId().getLeastSignificantBits()) | |
31 | 33 | .setName(asset.getName()) |
32 | 34 | .setType(asset.getType()); |
33 | 35 | if (asset.getLabel() != null) { | ... | ... |
... | ... | @@ -34,6 +34,8 @@ public class DeviceUpdateMsgConstructor { |
34 | 34 | public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) { |
35 | 35 | DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() |
36 | 36 | .setMsgType(msgType) |
37 | + .setIdMSB(device.getId().getId().getMostSignificantBits()) | |
38 | + .setIdLSB(device.getId().getId().getLeastSignificantBits()) | |
37 | 39 | .setName(device.getName()) |
38 | 40 | .setType(device.getType()); |
39 | 41 | if (device.getLabel() != null) { | ... | ... |
... | ... | @@ -16,16 +16,8 @@ |
16 | 16 | package org.thingsboard.server.service.edge.rpc.constructor; |
17 | 17 | |
18 | 18 | import lombok.extern.slf4j.Slf4j; |
19 | -import org.springframework.beans.factory.annotation.Autowired; | |
20 | 19 | import org.springframework.stereotype.Component; |
21 | -import org.thingsboard.server.common.data.Device; | |
22 | -import org.thingsboard.server.common.data.EntityType; | |
23 | 20 | import org.thingsboard.server.common.data.EntityView; |
24 | -import org.thingsboard.server.common.data.asset.Asset; | |
25 | -import org.thingsboard.server.common.data.id.AssetId; | |
26 | -import org.thingsboard.server.common.data.id.DeviceId; | |
27 | -import org.thingsboard.server.dao.asset.AssetService; | |
28 | -import org.thingsboard.server.dao.device.DeviceService; | |
29 | 21 | import org.thingsboard.server.gen.edge.EdgeEntityType; |
30 | 22 | import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; |
31 | 23 | import org.thingsboard.server.gen.edge.UpdateMsgType; |
... | ... | @@ -34,34 +26,27 @@ import org.thingsboard.server.gen.edge.UpdateMsgType; |
34 | 26 | @Slf4j |
35 | 27 | public class EntityViewUpdateMsgConstructor { |
36 | 28 | |
37 | - @Autowired | |
38 | - private DeviceService deviceService; | |
39 | - | |
40 | - @Autowired | |
41 | - private AssetService assetService; | |
42 | - | |
43 | 29 | public EntityViewUpdateMsg constructEntityViewUpdatedMsg(UpdateMsgType msgType, EntityView entityView) { |
44 | - String relatedName; | |
45 | - String relatedType; | |
46 | - EdgeEntityType relatedEntityType; | |
47 | - if (entityView.getEntityId().getEntityType().equals(EntityType.DEVICE)) { | |
48 | - Device device = deviceService.findDeviceById(entityView.getTenantId(), new DeviceId(entityView.getEntityId().getId())); | |
49 | - relatedName = device.getName(); | |
50 | - relatedType = device.getType(); | |
51 | - relatedEntityType = EdgeEntityType.DEVICE; | |
52 | - } else { | |
53 | - Asset asset = assetService.findAssetById(entityView.getTenantId(), new AssetId(entityView.getEntityId().getId())); | |
54 | - relatedName = asset.getName(); | |
55 | - relatedType = asset.getType(); | |
56 | - relatedEntityType = EdgeEntityType.ASSET; | |
30 | + EdgeEntityType entityType; | |
31 | + switch (entityView.getEntityId().getEntityType()) { | |
32 | + case DEVICE: | |
33 | + entityType = EdgeEntityType.DEVICE; | |
34 | + break; | |
35 | + case ASSET: | |
36 | + entityType = EdgeEntityType.ASSET; | |
37 | + break; | |
38 | + default: | |
39 | + throw new RuntimeException("Unsupported entity type [" + entityView.getEntityId().getEntityType() + "]"); | |
57 | 40 | } |
58 | 41 | EntityViewUpdateMsg.Builder builder = EntityViewUpdateMsg.newBuilder() |
59 | 42 | .setMsgType(msgType) |
43 | + .setIdMSB(entityView.getId().getId().getMostSignificantBits()) | |
44 | + .setIdLSB(entityView.getId().getId().getLeastSignificantBits()) | |
60 | 45 | .setName(entityView.getName()) |
61 | 46 | .setType(entityView.getType()) |
62 | - .setRelatedName(relatedName) | |
63 | - .setRelatedType(relatedType) | |
64 | - .setRelatedEntityType(relatedEntityType); | |
47 | + .setIdMSB(entityView.getEntityId().getId().getMostSignificantBits()) | |
48 | + .setIdLSB(entityView.getEntityId().getId().getLeastSignificantBits()) | |
49 | + .setEntityType(entityType); | |
65 | 50 | return builder.build(); |
66 | 51 | } |
67 | 52 | ... | ... |
... | ... | @@ -35,6 +35,8 @@ public class UserUpdateMsgConstructor { |
35 | 35 | public UserUpdateMsg constructUserUpdatedMsg(UpdateMsgType msgType, User user) { |
36 | 36 | UserUpdateMsg.Builder builder = UserUpdateMsg.newBuilder() |
37 | 37 | .setMsgType(msgType) |
38 | + .setIdMSB(user.getId().getId().getMostSignificantBits()) | |
39 | + .setIdLSB(user.getId().getId().getLeastSignificantBits()) | |
38 | 40 | .setEmail(user.getEmail()) |
39 | 41 | .setAuthority(user.getAuthority().name()) |
40 | 42 | .setEnabled(false); | ... | ... |
... | ... | @@ -141,7 +141,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { |
141 | 141 | log.debug("[{}] Entity update message received {}", edgeKey, responseMsg.getEntityUpdateMsg()); |
142 | 142 | onEntityUpdate.accept(responseMsg.getEntityUpdateMsg()); |
143 | 143 | } else if (responseMsg.hasDownlinkMsg()) { |
144 | - log.debug("[{}] Downlink message received for rule chain {}", edgeKey, responseMsg.getDownlinkMsg()); | |
144 | + log.debug("[{}] Downlink message received {}", edgeKey, responseMsg.getDownlinkMsg()); | |
145 | 145 | onDownlink.accept(responseMsg.getDownlinkMsg()); |
146 | 146 | } |
147 | 147 | } | ... | ... |
... | ... | @@ -99,7 +99,9 @@ enum UpdateMsgType { |
99 | 99 | message EntityDataProto { |
100 | 100 | string entityName = 1; |
101 | 101 | string entityType = 2; |
102 | - bytes tbMsg = 3; | |
102 | + int64 entityIdMSB = 3; | |
103 | + int64 entityIdLSB = 4; | |
104 | + bytes tbMsg = 5; | |
103 | 105 | } |
104 | 106 | |
105 | 107 | message RuleChainUpdateMsg { |
... | ... | @@ -159,31 +161,37 @@ message DashboardUpdateMsg { |
159 | 161 | |
160 | 162 | message DeviceUpdateMsg { |
161 | 163 | UpdateMsgType msgType = 1; |
162 | - string name = 2; | |
163 | - string type = 3; | |
164 | - string label = 4; | |
165 | - string credentialsType = 5; | |
166 | - string credentialsId = 6; | |
167 | - string credentialsValue = 7; | |
168 | - string groupName = 8; | |
164 | + int64 idMSB = 2; | |
165 | + int64 idLSB = 3; | |
166 | + string name = 4; | |
167 | + string type = 5; | |
168 | + string label = 6; | |
169 | + string credentialsType = 7; | |
170 | + string credentialsId = 8; | |
171 | + string credentialsValue = 9; | |
172 | + string groupName = 10; | |
169 | 173 | } |
170 | 174 | |
171 | 175 | message AssetUpdateMsg { |
172 | 176 | UpdateMsgType msgType = 1; |
173 | - string name = 2; | |
174 | - string type = 3; | |
175 | - string label = 4; | |
176 | - string groupName = 5; | |
177 | + int64 idMSB = 2; | |
178 | + int64 idLSB = 3; | |
179 | + string name = 4; | |
180 | + string type = 5; | |
181 | + string label = 6; | |
182 | + string groupName = 7; | |
177 | 183 | } |
178 | 184 | |
179 | 185 | message EntityViewUpdateMsg { |
180 | 186 | UpdateMsgType msgType = 1; |
181 | - string name = 2; | |
182 | - string type = 3; | |
183 | - string relatedName = 4; | |
184 | - string relatedType = 5; | |
185 | - EdgeEntityType relatedEntityType = 6; | |
186 | - string groupName = 7; | |
187 | + int64 idMSB = 2; | |
188 | + int64 idLSB = 3; | |
189 | + string name = 4; | |
190 | + string type = 5; | |
191 | + int64 entityIdMSB = 6; | |
192 | + int64 entityIdLSB = 7; | |
193 | + EdgeEntityType entityType = 8; | |
194 | + string groupName = 9; | |
187 | 195 | } |
188 | 196 | |
189 | 197 | message AlarmUpdateMsg { |
... | ... | @@ -220,14 +228,16 @@ message CustomerUpdateMsg { |
220 | 228 | |
221 | 229 | message UserUpdateMsg { |
222 | 230 | UpdateMsgType msgType = 1; |
223 | - string email = 2; | |
224 | - string authority = 3; | |
225 | - string firstName = 4; | |
226 | - string lastName = 5; | |
227 | - string additionalInfo = 6; | |
228 | - bool enabled = 7; | |
229 | - string password = 8; | |
230 | - string groupName = 9; | |
231 | + int64 idMSB = 2; | |
232 | + int64 idLSB = 3; | |
233 | + string email = 4; | |
234 | + string authority = 5; | |
235 | + string firstName = 6; | |
236 | + string lastName = 7; | |
237 | + string additionalInfo = 8; | |
238 | + bool enabled = 9; | |
239 | + string password = 10; | |
240 | + string groupName = 11; | |
231 | 241 | } |
232 | 242 | |
233 | 243 | message RuleChainMetadataRequestMsg { | ... | ... |