Commit 8462481ebfc2960b340720569841aa29b42f58ed

Authored by Volodymyr Babak
1 parent f4baab49

Refacroting of pushing edge notification events

Showing 17 changed files with 379 additions and 195 deletions
... ... @@ -91,6 +91,9 @@ public class AlarmController extends BaseController {
91 91 logEntityAction(savedAlarm.getId(), savedAlarm,
92 92 getCurrentUser().getCustomerId(),
93 93 alarm.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
  94 +
  95 + sendNotificationMsgToEdgeService(getTenantId(), savedAlarm.getId(), alarm.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
  96 +
94 97 return savedAlarm;
95 98 } catch (Exception e) {
96 99 logEntityAction(emptyId(EntityType.ALARM), alarm,
... ... @@ -107,8 +110,11 @@ public class AlarmController extends BaseController {
107 110 try {
108 111 AlarmId alarmId = new AlarmId(toUUID(strAlarmId));
109 112 checkAlarmId(alarmId, Operation.WRITE);
  113 +
  114 + sendNotificationMsgToEdgeService(getTenantId(), alarmId, ActionType.DELETED);
  115 +
110 116 return alarmService.deleteAlarm(getTenantId(), alarmId);
111   - } catch (Exception e) {
  117 + } catch (Exception e) {
112 118 throw handleException(e);
113 119 }
114 120 }
... ...
... ... @@ -88,7 +88,8 @@ public class AssetController extends BaseController {
88 88
89 89 Asset savedAsset = checkNotNull(assetService.saveAsset(asset));
90 90
91   - sendNotificationMsgToEdgeService(savedAsset.getTenantId(), savedAsset.getId(), EdgeEventType.ASSET, asset.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
  91 + sendNotificationMsgToEdgeService(savedAsset.getTenantId(), null,
  92 + savedAsset.getId(), EdgeEventType.ASSET, asset.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
92 93
93 94 logEntityAction(savedAsset.getId(), savedAsset,
94 95 savedAsset.getCustomerId(),
... ... @@ -116,7 +117,7 @@ public class AssetController extends BaseController {
116 117 asset.getCustomerId(),
117 118 ActionType.DELETED, null, strAssetId);
118 119
119   - sendNotificationMsgToEdgeService(getTenantId(), assetId, EdgeEventType.ASSET, ActionType.DELETED);
  120 + sendNotificationMsgToEdgeService(getTenantId(), null, assetId, EdgeEventType.ASSET, ActionType.DELETED);
120 121 } catch (Exception e) {
121 122 logEntityAction(emptyId(EntityType.ASSET),
122 123 null,
... ... @@ -359,7 +360,7 @@ public class AssetController extends BaseController {
359 360 savedAsset.getCustomerId(),
360 361 ActionType.ASSIGNED_TO_EDGE, null, strAssetId, strEdgeId, edge.getName());
361 362
362   - sendNotificationMsgToEdgeService(getTenantId(), savedAsset.getId(), EdgeEventType.ASSET, ActionType.ASSIGNED_TO_EDGE);
  363 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedAsset.getId(), EdgeEventType.ASSET, ActionType.ASSIGNED_TO_EDGE);
363 364
364 365 return savedAsset;
365 366 } catch (Exception e) {
... ... @@ -392,7 +393,8 @@ public class AssetController extends BaseController {
392 393 asset.getCustomerId(),
393 394 ActionType.UNASSIGNED_FROM_EDGE, null, strAssetId, edge.getId().toString(), edge.getName());
394 395
395   - sendNotificationMsgToEdgeService(getTenantId(), savedAsset.getId(), EdgeEventType.ASSET, ActionType.UNASSIGNED_FROM_EDGE);
  396 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedAsset.getId(),
  397 + EdgeEventType.ASSET, ActionType.UNASSIGNED_FROM_EDGE);
396 398
397 399 return savedAsset;
398 400 } catch (Exception e) {
... ...
... ... @@ -701,7 +701,7 @@ public abstract class BaseController {
701 701
702 702 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityRelation relation, ActionType edgeEventAction) {
703 703 try {
704   - sendNotificationMsgToEdgeService(tenantId, null, json.writeValueAsString(relation), EdgeEventType.RELATION, edgeEventAction);
  704 + sendNotificationMsgToEdgeService(tenantId, null, null, json.writeValueAsString(relation), EdgeEventType.RELATION, edgeEventAction);
705 705 } catch (Exception e) {
706 706 log.warn("Failed to push relation to core: {}", relation, e);
707 707 }
... ... @@ -710,15 +710,15 @@ public abstract class BaseController {
710 710 protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, ActionType edgeEventAction) {
711 711 EdgeEventType edgeEventType = edgeEventService.getEdgeEventTypeByEntityType(entityId.getEntityType());
712 712 if (edgeEventType != null) {
713   - sendNotificationMsgToEdgeService(tenantId, entityId, null, edgeEventType, edgeEventAction);
  713 + sendNotificationMsgToEdgeService(tenantId, null, entityId, null, edgeEventType, edgeEventAction);
714 714 }
715 715 }
716 716
717   - protected void sendNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, EdgeEventType edgeEventType, ActionType edgeEventAction) {
718   - sendNotificationMsgToEdgeService(tenantId, entityId, null, edgeEventType, edgeEventAction);
  717 + protected void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventType edgeEventType, ActionType edgeEventAction) {
  718 + sendNotificationMsgToEdgeService(tenantId, edgeId, entityId, null, edgeEventType, edgeEventAction);
719 719 }
720 720
721   - private void sendNotificationMsgToEdgeService(TenantId tenantId, EntityId entityId, String entityBody, EdgeEventType edgeEventType, ActionType edgeEventAction) {
  721 + private void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, String entityBody, EdgeEventType edgeEventType, ActionType edgeEventAction) {
722 722 TransportProtos.EdgeNotificationMsgProto.Builder builder = TransportProtos.EdgeNotificationMsgProto.newBuilder();
723 723 builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
724 724 builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
... ... @@ -729,6 +729,10 @@ public abstract class BaseController {
729 729 builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
730 730 builder.setEntityType(entityId.getEntityType().name());
731 731 }
  732 + if (edgeId != null) {
  733 + builder.setEdgeIdMSB(edgeId.getId().getMostSignificantBits());
  734 + builder.setEdgeIdLSB(edgeId.getId().getLeastSignificantBits());
  735 + }
732 736 if (entityBody != null) {
733 737 builder.setEntityBody(entityBody);
734 738 }
... ...
... ... @@ -117,7 +117,7 @@ public class DashboardController extends BaseController {
117 117 null,
118 118 dashboard.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
119 119
120   - sendNotificationMsgToEdgeService(savedDashboard.getTenantId(), savedDashboard.getId(),
  120 + sendNotificationMsgToEdgeService(savedDashboard.getTenantId(), null, savedDashboard.getId(),
121 121 EdgeEventType.DASHBOARD, savedDashboard.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
122 122
123 123 return savedDashboard;
... ... @@ -143,7 +143,7 @@ public class DashboardController extends BaseController {
143 143 null,
144 144 ActionType.DELETED, null, strDashboardId);
145 145
146   - sendNotificationMsgToEdgeService(getTenantId(), dashboardId, EdgeEventType.DASHBOARD, ActionType.DELETED);
  146 + sendNotificationMsgToEdgeService(getTenantId(), null, dashboardId, EdgeEventType.DASHBOARD, ActionType.DELETED);
147 147 } catch (Exception e) {
148 148
149 149 logEntityAction(emptyId(EntityType.DASHBOARD),
... ... @@ -500,7 +500,8 @@ public class DashboardController extends BaseController {
500 500 null,
501 501 ActionType.ASSIGNED_TO_EDGE, null, strDashboardId, strEdgeId, edge.getName());
502 502
503   - sendNotificationMsgToEdgeService(getTenantId(), savedDashboard.getId(), EdgeEventType.DASHBOARD, ActionType.ASSIGNED_TO_EDGE);
  503 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedDashboard.getId(),
  504 + EdgeEventType.DASHBOARD, ActionType.ASSIGNED_TO_EDGE);
504 505
505 506 return savedDashboard;
506 507 } catch (Exception e) {
... ... @@ -532,7 +533,8 @@ public class DashboardController extends BaseController {
532 533 null,
533 534 ActionType.UNASSIGNED_FROM_EDGE, null, strDashboardId, edge.getId().toString(), edge.getName());
534 535
535   - sendNotificationMsgToEdgeService(getTenantId(), savedDashboard.getId(), EdgeEventType.DASHBOARD, ActionType.UNASSIGNED_FROM_EDGE);
  536 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedDashboard.getId(),
  537 + EdgeEventType.DASHBOARD, ActionType.UNASSIGNED_FROM_EDGE);
536 538
537 539 return savedDashboard;
538 540 } catch (Exception e) {
... ...
... ... @@ -108,7 +108,8 @@ public class DeviceController extends BaseController {
108 108 tbClusterService.pushMsgToCore(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(),
109 109 savedDevice.getId(), savedDevice.getName(), savedDevice.getType()), null);
110 110
111   - sendNotificationMsgToEdgeService(savedDevice.getTenantId(), savedDevice.getId(), EdgeEventType.DEVICE, device.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
  111 + sendNotificationMsgToEdgeService(savedDevice.getTenantId(), null, savedDevice.getId(),
  112 + EdgeEventType.DEVICE, device.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
112 113
113 114 logEntityAction(savedDevice.getId(), savedDevice,
114 115 savedDevice.getCustomerId(),
... ... @@ -141,7 +142,7 @@ public class DeviceController extends BaseController {
141 142 device.getCustomerId(),
142 143 ActionType.DELETED, null, strDeviceId);
143 144
144   - sendNotificationMsgToEdgeService(getTenantId(), deviceId, EdgeEventType.DEVICE, ActionType.DELETED);
  145 + sendNotificationMsgToEdgeService(getTenantId(), null, deviceId, EdgeEventType.DEVICE, ActionType.DELETED);
145 146
146 147 deviceStateService.onDeviceDeleted(device);
147 148 } catch (Exception e) {
... ... @@ -266,7 +267,7 @@ public class DeviceController extends BaseController {
266 267
267 268 tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId()), null);
268 269
269   - sendNotificationMsgToEdgeService(getTenantId(), device.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED);
  270 + sendNotificationMsgToEdgeService(getTenantId(), null, device.getId(), EdgeEventType.DEVICE, ActionType.CREDENTIALS_UPDATED);
270 271
271 272 logEntityAction(device.getId(), device,
272 273 device.getCustomerId(),
... ... @@ -517,7 +518,7 @@ public class DeviceController extends BaseController {
517 518 savedDevice.getCustomerId(),
518 519 ActionType.ASSIGNED_TO_EDGE, null, strDeviceId, strEdgeId, edge.getName());
519 520
520   - sendNotificationMsgToEdgeService(getTenantId(), savedDevice.getId(), EdgeEventType.DEVICE, ActionType.ASSIGNED_TO_EDGE);
  521 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedDevice.getId(), EdgeEventType.DEVICE, ActionType.ASSIGNED_TO_EDGE);
521 522
522 523 return savedDevice;
523 524 } catch (Exception e) {
... ... @@ -548,7 +549,7 @@ public class DeviceController extends BaseController {
548 549 device.getCustomerId(),
549 550 ActionType.UNASSIGNED_FROM_EDGE, null, strDeviceId, edge.getId().toString(), edge.getName());
550 551
551   - sendNotificationMsgToEdgeService(getTenantId(), savedDevice.getId(), EdgeEventType.DEVICE, ActionType.UNASSIGNED_FROM_EDGE);
  552 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedDevice.getId(), EdgeEventType.DEVICE, ActionType.UNASSIGNED_FROM_EDGE);
552 553
553 554 return savedDevice;
554 555 } catch (Exception e) {
... ...
... ... @@ -118,7 +118,8 @@ public class EntityViewController extends BaseController {
118 118 logEntityAction(savedEntityView.getId(), savedEntityView, null,
119 119 entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED, null);
120 120
121   - sendNotificationMsgToEdgeService(getTenantId(), savedEntityView.getId(), EdgeEventType.ENTITY_VIEW, entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
  121 + sendNotificationMsgToEdgeService(getTenantId(), null, savedEntityView.getId(),
  122 + EdgeEventType.ENTITY_VIEW, entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
122 123 return savedEntityView;
123 124 } catch (Exception e) {
124 125 logEntityAction(emptyId(EntityType.ENTITY_VIEW), entityView, null,
... ... @@ -189,7 +190,7 @@ public class EntityViewController extends BaseController {
189 190 logEntityAction(entityViewId, entityView, entityView.getCustomerId(),
190 191 ActionType.DELETED, null, strEntityViewId);
191 192
192   - sendNotificationMsgToEdgeService(getTenantId(), entityViewId, EdgeEventType.ENTITY_VIEW, ActionType.DELETED);
  193 + sendNotificationMsgToEdgeService(getTenantId(), null, entityViewId, EdgeEventType.ENTITY_VIEW, ActionType.DELETED);
193 194 } catch (Exception e) {
194 195 logEntityAction(emptyId(EntityType.ENTITY_VIEW),
195 196 null,
... ... @@ -395,7 +396,8 @@ public class EntityViewController extends BaseController {
395 396 savedEntityView.getCustomerId(),
396 397 ActionType.ASSIGNED_TO_EDGE, null, strEntityViewId, strEdgeId, edge.getName());
397 398
398   - sendNotificationMsgToEdgeService(getTenantId(), savedEntityView.getId(), EdgeEventType.ENTITY_VIEW, ActionType.ASSIGNED_TO_EDGE);
  399 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedEntityView.getId(),
  400 + EdgeEventType.ENTITY_VIEW, ActionType.ASSIGNED_TO_EDGE);
399 401
400 402 return savedEntityView;
401 403 } catch (Exception e) {
... ... @@ -425,7 +427,8 @@ public class EntityViewController extends BaseController {
425 427 entityView.getCustomerId(),
426 428 ActionType.UNASSIGNED_FROM_EDGE, null, strEntityViewId, edge.getId().toString(), edge.getName());
427 429
428   - sendNotificationMsgToEdgeService(getTenantId(), savedEntityView.getId(), EdgeEventType.ENTITY_VIEW, ActionType.UNASSIGNED_FROM_EDGE);
  430 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedEntityView.getId(),
  431 + EdgeEventType.ENTITY_VIEW, ActionType.UNASSIGNED_FROM_EDGE);
429 432
430 433 return savedEntityView;
431 434 } catch (Exception e) {
... ...
... ... @@ -145,7 +145,7 @@ public class RuleChainController extends BaseController {
145 145 created ? ActionType.ADDED : ActionType.UPDATED, null);
146 146
147 147 if (RuleChainType.EDGE.equals(savedRuleChain.getType())) {
148   - sendNotificationMsgToEdgeService(savedRuleChain.getTenantId(),
  148 + sendNotificationMsgToEdgeService(savedRuleChain.getTenantId(), null,
149 149 savedRuleChain.getId(), EdgeEventType.RULE_CHAIN,
150 150 savedRuleChain.getId() == null ? ActionType.ADDED : ActionType.UPDATED);
151 151 }
... ... @@ -226,6 +226,7 @@ public class RuleChainController extends BaseController {
226 226
227 227 if (RuleChainType.EDGE.equals(ruleChain.getType())) {
228 228 sendNotificationMsgToEdgeService(ruleChain.getTenantId(),
  229 + null,
229 230 ruleChain.getId(), EdgeEventType.RULE_CHAIN,
230 231 ActionType.UPDATED);
231 232 }
... ... @@ -292,9 +293,8 @@ public class RuleChainController extends BaseController {
292 293 ActionType.DELETED, null, strRuleChainId);
293 294
294 295 if (RuleChainType.EDGE.equals(ruleChain.getType())) {
295   - sendNotificationMsgToEdgeService(ruleChain.getTenantId(),
296   - ruleChain.getId(), EdgeEventType.RULE_CHAIN,
297   - ActionType.DELETED);
  296 + sendNotificationMsgToEdgeService(ruleChain.getTenantId(), null,
  297 + ruleChain.getId(), EdgeEventType.RULE_CHAIN, ActionType.DELETED);
298 298 }
299 299
300 300 } catch (Exception e) {
... ... @@ -426,7 +426,7 @@ public class RuleChainController extends BaseController {
426 426 null,
427 427 ActionType.ASSIGNED_TO_EDGE, null, strRuleChainId, strEdgeId, edge.getName());
428 428
429   - sendNotificationMsgToEdgeService(getTenantId(), savedRuleChain.getId(),
  429 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedRuleChain.getId(),
430 430 EdgeEventType.RULE_CHAIN, ActionType.ASSIGNED_TO_EDGE);
431 431
432 432 return savedRuleChain;
... ... @@ -459,7 +459,7 @@ public class RuleChainController extends BaseController {
459 459 null,
460 460 ActionType.UNASSIGNED_FROM_EDGE, null, strRuleChainId, edge.getId().toString(), edge.getName());
461 461
462   - sendNotificationMsgToEdgeService(getTenantId(), savedRuleChain.getId(),
  462 + sendNotificationMsgToEdgeService(getTenantId(), edgeId, savedRuleChain.getId(),
463 463 EdgeEventType.RULE_CHAIN, ActionType.UNASSIGNED_FROM_EDGE);
464 464
465 465 return savedRuleChain;
... ...
... ... @@ -36,11 +36,12 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
36 36 import org.thingsboard.server.common.data.id.IdBased;
37 37 import org.thingsboard.server.common.data.id.RuleChainId;
38 38 import org.thingsboard.server.common.data.id.TenantId;
  39 +import org.thingsboard.server.common.data.page.TextPageData;
  40 +import org.thingsboard.server.common.data.page.TextPageLink;
39 41 import org.thingsboard.server.common.data.page.TimePageData;
40 42 import org.thingsboard.server.common.data.page.TimePageLink;
41 43 import org.thingsboard.server.common.data.relation.EntityRelation;
42 44 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
43   -import org.thingsboard.server.common.data.rule.RuleChain;
44 45 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
45 46 import org.thingsboard.server.common.msg.queue.TbCallback;
46 47 import org.thingsboard.server.dao.alarm.AlarmService;
... ... @@ -141,9 +142,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
141 142 @Override
142 143 public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
143 144 try {
144   - EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
145   - ActionType edgeEventAction = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction());
146 145 TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
  146 + EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
147 147 switch (edgeEventType) {
148 148 // TODO: voba - handle edge updates
149 149 // case EDGE:
... ... @@ -152,34 +152,13 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
152 152 case ENTITY_VIEW:
153 153 case DASHBOARD:
154 154 case RULE_CHAIN:
155   - EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
156   - ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsEntityId(tenantId, entityId);
157   - Futures.transform(edgeIdsFuture, edgeIds -> {
158   - if (edgeIds != null && !edgeIds.isEmpty()) {
159   - for (EdgeId edgeId : edgeIds) {
160   - try {
161   - saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventAction, entityId, null);
162   - if (edgeEventType.equals(EdgeEventType.RULE_CHAIN) &&
163   - (ActionType.UPDATED.equals(edgeEventAction) || ActionType.ADDED.equals(edgeEventAction))) {
164   - RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, new RuleChainId(entityId.getId()));
165   - saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, edgeEventAction, ruleChainMetaData.getRuleChainId(), null);
166   - }
167   - } catch (Exception e) {
168   - log.error("[{}] Failed to push event to edge, edgeId [{}], edgeEventType [{}], edgeEventAction [{}], entityId [{}]",
169   - tenantId, edgeId, edgeEventType, edgeEventAction, entityId, e);
170   - }
171   - }
172   - }
173   - return null;
174   - }, dbCallbackExecutorService);
  155 + processEntities(tenantId, edgeNotificationMsg);
175 156 break;
176 157 case ALARM:
177   - EntityId alarmId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
178   - processAlarm(tenantId, edgeEventAction, alarmId);
  158 + processAlarm(tenantId, edgeNotificationMsg);
179 159 break;
180 160 case RELATION:
181   - EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class);
182   - processRelation(tenantId, edgeEventAction, entityRelation);
  161 + processRelation(tenantId, edgeNotificationMsg);
183 162 break;
184 163 default:
185 164 log.debug("Edge event type [{}] is not designed to be pushed to edge", edgeEventType);
... ... @@ -192,17 +171,69 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
192 171 }
193 172 }
194 173
195   - private void processAlarm(TenantId tenantId, ActionType edgeActionType, EntityId alarmId) {
196   - ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, new AlarmId(alarmId.getId()));
  174 + private void processEntities(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
  175 + ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction());
  176 + EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
  177 + EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
  178 + switch (edgeEventActionType) {
  179 + // TODO: voba - ADDED is not required for CE version ?
  180 + // case ADDED:
  181 + case UPDATED:
  182 + ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsByEntityId(tenantId, entityId);
  183 + Futures.transform(edgeIdsFuture, edgeIds -> {
  184 + if (edgeIds != null && !edgeIds.isEmpty()) {
  185 + for (EdgeId edgeId : edgeIds) {
  186 + try {
  187 + saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null);
  188 + if (edgeEventType.equals(EdgeEventType.RULE_CHAIN)) {
  189 + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, new RuleChainId(entityId.getId()));
  190 + saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, edgeEventActionType, ruleChainMetaData.getRuleChainId(), null);
  191 + }
  192 + } catch (Exception e) {
  193 + log.error("[{}] Failed to push event to edge, edgeId [{}], edgeEventType [{}], edgeEventActionType [{}], entityId [{}]",
  194 + tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, e);
  195 + }
  196 + }
  197 + }
  198 + return null;
  199 + }, dbCallbackExecutorService);
  200 + break;
  201 + case DELETED:
  202 + TextPageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
  203 + if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) {
  204 + for (Edge edge : edgesByTenantId.getData()) {
  205 + saveEdgeEvent(tenantId, edge.getId(), edgeEventType, edgeEventActionType, entityId, null);
  206 + }
  207 + }
  208 + break;
  209 + case ASSIGNED_TO_EDGE:
  210 + case UNASSIGNED_FROM_EDGE:
  211 + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
  212 + saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null);
  213 + break;
  214 + case RELATIONS_DELETED:
  215 + // TODO: voba - add support for relations deleted
  216 + break;
  217 + }
  218 + }
  219 +
  220 + private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
  221 + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
  222 + ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
197 223 Futures.transform(alarmFuture, alarm -> {
198 224 if (alarm != null) {
199 225 EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
200 226 if (edgeEventType != null) {
201   - ListenableFuture<List<EdgeId>> relatedEdgeIdsEntityIdFuture = findRelatedEdgeIdsEntityId(tenantId, alarm.getOriginator());
202   - Futures.transform(relatedEdgeIdsEntityIdFuture, relatedEdgeIdsEntityId -> {
203   - if (relatedEdgeIdsEntityId != null) {
204   - for (EdgeId edgeId : relatedEdgeIdsEntityId) {
205   - saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, edgeActionType, alarmId, null);
  227 + ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator());
  228 + Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> {
  229 + if (relatedEdgeIdsByEntityId != null) {
  230 + for (EdgeId edgeId : relatedEdgeIdsByEntityId) {
  231 + saveEdgeEvent(tenantId,
  232 + edgeId,
  233 + EdgeEventType.ALARM,
  234 + ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()),
  235 + alarmId,
  236 + null);
206 237 }
207 238 }
208 239 return null;
... ... @@ -213,10 +244,11 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
213 244 }, dbCallbackExecutorService);
214 245 }
215 246
216   - private void processRelation(TenantId tenantId, ActionType edgeActionType, EntityRelation entityRelation) {
  247 + private void processRelation(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
  248 + EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class);
217 249 List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
218   - futures.add(findRelatedEdgeIdsEntityId(tenantId, entityRelation.getTo()));
219   - futures.add(findRelatedEdgeIdsEntityId(tenantId, entityRelation.getFrom()));
  250 + futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getTo()));
  251 + futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getFrom()));
220 252 ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
221 253 Futures.transform(combinedFuture, listOfListsEdgeIds -> {
222 254 Set<EdgeId> uniqueEdgeIds = new HashSet<>();
... ... @@ -229,14 +261,19 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
229 261 }
230 262 if (!uniqueEdgeIds.isEmpty()) {
231 263 for (EdgeId edgeId : uniqueEdgeIds) {
232   - saveEdgeEvent(tenantId, edgeId, EdgeEventType.RELATION, edgeActionType, null, mapper.valueToTree(entityRelation));
  264 + saveEdgeEvent(tenantId,
  265 + edgeId,
  266 + EdgeEventType.RELATION,
  267 + ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()),
  268 + null,
  269 + mapper.valueToTree(entityRelation));
233 270 }
234 271 }
235 272 return null;
236 273 }, dbCallbackExecutorService);
237 274 }
238 275
239   - private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsEntityId(TenantId tenantId, EntityId entityId) {
  276 + private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
240 277 switch (entityId.getEntityType()) {
241 278 case DEVICE:
242 279 case ASSET:
... ...
... ... @@ -360,122 +360,193 @@ public final class EdgeGrpcSession implements Closeable {
360 360
361 361 private void processDeviceCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
362 362 DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
363   - ListenableFuture<Device> deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edgeEvent.getTenantId(), deviceId);
364   - Futures.addCallback(deviceFuture,
365   - new FutureCallback<Device>() {
366   - @Override
367   - public void onSuccess(@Nullable Device device) {
368   - if (device != null) {
369   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
370   - .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device))
371   - .build();
372   - outputStream.onNext(ResponseMsg.newBuilder()
373   - .setEntityUpdateMsg(entityUpdateMsg)
374   - .build());
375   - }
376   - }
  363 + switch (msgType) {
  364 + case ENTITY_CREATED_RPC_MESSAGE:
  365 + case ENTITY_UPDATED_RPC_MESSAGE:
  366 + case DEVICE_CONFLICT_RPC_MESSAGE:
  367 + ListenableFuture<Device> deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edgeEvent.getTenantId(), deviceId);
  368 + Futures.addCallback(deviceFuture,
  369 + new FutureCallback<Device>() {
  370 + @Override
  371 + public void onSuccess(@Nullable Device device) {
  372 + if (device != null) {
  373 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  374 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device))
  375 + .build();
  376 + outputStream.onNext(ResponseMsg.newBuilder()
  377 + .setEntityUpdateMsg(entityUpdateMsg)
  378 + .build());
  379 + }
  380 + }
  381 +
  382 + @Override
  383 + public void onFailure(Throwable t) {
  384 + log.warn("Can't processDeviceCRUD, edgeEvent [{}]", edgeEvent, t);
  385 + }
  386 + }, ctx.getDbCallbackExecutor());
  387 + break;
  388 + case ENTITY_DELETED_RPC_MESSAGE:
  389 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  390 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceDeleteMsg(deviceId))
  391 + .build();
  392 + outputStream.onNext(ResponseMsg.newBuilder()
  393 + .setEntityUpdateMsg(entityUpdateMsg)
  394 + .build());
  395 + }
  396 +
377 397
378   - @Override
379   - public void onFailure(Throwable t) {
380   - log.warn("Can't processDeviceCRUD, edgeEvent [{}]", edgeEvent, t);
381   - }
382   - }, ctx.getDbCallbackExecutor());
383 398 }
384 399
385 400 private void processAssetCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
386 401 AssetId assetId = new AssetId(edgeEvent.getEntityId());
387   - ListenableFuture<Asset> assetFuture = ctx.getAssetService().findAssetByIdAsync(edgeEvent.getTenantId(), assetId);
388   - Futures.addCallback(assetFuture,
389   - new FutureCallback<Asset>() {
390   - @Override
391   - public void onSuccess(@Nullable Asset asset) {
392   - if (asset != null) {
393   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
394   - .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset))
395   - .build();
396   - outputStream.onNext(ResponseMsg.newBuilder()
397   - .setEntityUpdateMsg(entityUpdateMsg)
398   - .build());
399   - }
400   - }
  402 + switch (msgType) {
  403 + case ENTITY_CREATED_RPC_MESSAGE:
  404 + case ENTITY_UPDATED_RPC_MESSAGE:
  405 + case DEVICE_CONFLICT_RPC_MESSAGE:
  406 + ListenableFuture<Asset> assetFuture = ctx.getAssetService().findAssetByIdAsync(edgeEvent.getTenantId(), assetId);
  407 + Futures.addCallback(assetFuture,
  408 + new FutureCallback<Asset>() {
  409 + @Override
  410 + public void onSuccess(@Nullable Asset asset) {
  411 + if (asset != null) {
  412 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  413 + .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset))
  414 + .build();
  415 + outputStream.onNext(ResponseMsg.newBuilder()
  416 + .setEntityUpdateMsg(entityUpdateMsg)
  417 + .build());
  418 + }
  419 + }
401 420
402   - @Override
403   - public void onFailure(Throwable t) {
404   - log.warn("Can't processAssetCRUD, edgeEvent [{}]", edgeEvent, t);
405   - }
406   - }, ctx.getDbCallbackExecutor());
  421 + @Override
  422 + public void onFailure(Throwable t) {
  423 + log.warn("Can't processAssetCRUD, edgeEvent [{}]", edgeEvent, t);
  424 + }
  425 + }, ctx.getDbCallbackExecutor());
  426 + break;
  427 + case ENTITY_DELETED_RPC_MESSAGE:
  428 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  429 + .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetDeleteMsg(assetId))
  430 + .build();
  431 + outputStream.onNext(ResponseMsg.newBuilder()
  432 + .setEntityUpdateMsg(entityUpdateMsg)
  433 + .build());
  434 + break;
  435 + }
407 436 }
408 437
409 438 private void processEntityViewCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
410 439 EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId());
411   - ListenableFuture<EntityView> entityViewFuture = ctx.getEntityViewService().findEntityViewByIdAsync(edgeEvent.getTenantId(), entityViewId);
412   - Futures.addCallback(entityViewFuture,
413   - new FutureCallback<EntityView>() {
414   - @Override
415   - public void onSuccess(@Nullable EntityView entityView) {
416   - if (entityView != null) {
417   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
418   - .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView))
419   - .build();
420   - outputStream.onNext(ResponseMsg.newBuilder()
421   - .setEntityUpdateMsg(entityUpdateMsg)
422   - .build());
423   - }
424   - }
  440 + switch (msgType) {
  441 + case ENTITY_CREATED_RPC_MESSAGE:
  442 + case ENTITY_UPDATED_RPC_MESSAGE:
  443 + case DEVICE_CONFLICT_RPC_MESSAGE:
  444 + ListenableFuture<EntityView> entityViewFuture = ctx.getEntityViewService().findEntityViewByIdAsync(edgeEvent.getTenantId(), entityViewId);
  445 + Futures.addCallback(entityViewFuture,
  446 + new FutureCallback<EntityView>() {
  447 + @Override
  448 + public void onSuccess(@Nullable EntityView entityView) {
  449 + if (entityView != null) {
  450 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  451 + .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView))
  452 + .build();
  453 + outputStream.onNext(ResponseMsg.newBuilder()
  454 + .setEntityUpdateMsg(entityUpdateMsg)
  455 + .build());
  456 + }
  457 + }
425 458
426   - @Override
427   - public void onFailure(Throwable t) {
428   - log.warn("Can't processEntityViewCRUD, edgeEvent [{}]", edgeEvent, t);
429   - }
430   - }, ctx.getDbCallbackExecutor());
  459 + @Override
  460 + public void onFailure(Throwable t) {
  461 + log.warn("Can't processEntityViewCRUD, edgeEvent [{}]", edgeEvent, t);
  462 + }
  463 + }, ctx.getDbCallbackExecutor());
  464 + break;
  465 + case ENTITY_DELETED_RPC_MESSAGE:
  466 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  467 + .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewDeleteMsg(entityViewId))
  468 + .build();
  469 + outputStream.onNext(ResponseMsg.newBuilder()
  470 + .setEntityUpdateMsg(entityUpdateMsg)
  471 + .build());
  472 + break;
  473 + }
431 474 }
432 475
433 476 private void processDashboardCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
434 477 DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId());
435   - ListenableFuture<Dashboard> dashboardFuture = ctx.getDashboardService().findDashboardByIdAsync(edgeEvent.getTenantId(), dashboardId);
436   - Futures.addCallback(dashboardFuture,
437   - new FutureCallback<Dashboard>() {
438   - @Override
439   - public void onSuccess(@Nullable Dashboard dashboard) {
440   - if (dashboard != null) {
441   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
442   - .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard))
443   - .build();
444   - outputStream.onNext(ResponseMsg.newBuilder()
445   - .setEntityUpdateMsg(entityUpdateMsg)
446   - .build());
447   - }
448   - }
  478 + switch (msgType) {
  479 + case ENTITY_CREATED_RPC_MESSAGE:
  480 + case ENTITY_UPDATED_RPC_MESSAGE:
  481 + case DEVICE_CONFLICT_RPC_MESSAGE:
  482 + ListenableFuture<Dashboard> dashboardFuture = ctx.getDashboardService().findDashboardByIdAsync(edgeEvent.getTenantId(), dashboardId);
  483 + Futures.addCallback(dashboardFuture,
  484 + new FutureCallback<Dashboard>() {
  485 + @Override
  486 + public void onSuccess(@Nullable Dashboard dashboard) {
  487 + if (dashboard != null) {
  488 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  489 + .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard))
  490 + .build();
  491 + outputStream.onNext(ResponseMsg.newBuilder()
  492 + .setEntityUpdateMsg(entityUpdateMsg)
  493 + .build());
  494 + }
  495 + }
449 496
450   - @Override
451   - public void onFailure(Throwable t) {
452   - log.warn("Can't processDashboardCRUD, edgeEvent [{}]", edgeEvent, t);
453   - }
454   - }, ctx.getDbCallbackExecutor());
  497 + @Override
  498 + public void onFailure(Throwable t) {
  499 + log.warn("Can't processDashboardCRUD, edgeEvent [{}]", edgeEvent, t);
  500 + }
  501 + }, ctx.getDbCallbackExecutor());
  502 + break;
  503 + case ENTITY_DELETED_RPC_MESSAGE:
  504 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  505 + .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardDeleteMsg(dashboardId))
  506 + .build();
  507 + outputStream.onNext(ResponseMsg.newBuilder()
  508 + .setEntityUpdateMsg(entityUpdateMsg)
  509 + .build());
  510 + break;
  511 + }
455 512 }
456 513
457 514 private void processRuleChainCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
458 515 RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
459   - ListenableFuture<RuleChain> ruleChainFuture = ctx.getRuleChainService().findRuleChainByIdAsync(edgeEvent.getTenantId(), ruleChainId);
460   - Futures.addCallback(ruleChainFuture,
461   - new FutureCallback<RuleChain>() {
462   - @Override
463   - public void onSuccess(@Nullable RuleChain ruleChain) {
464   - if (ruleChain != null) {
465   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
466   - .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain))
467   - .build();
468   - outputStream.onNext(ResponseMsg.newBuilder()
469   - .setEntityUpdateMsg(entityUpdateMsg)
470   - .build());
471   - }
472   - }
  516 + switch (msgType) {
  517 + case ENTITY_CREATED_RPC_MESSAGE:
  518 + case ENTITY_UPDATED_RPC_MESSAGE:
  519 + case DEVICE_CONFLICT_RPC_MESSAGE:
  520 + ListenableFuture<RuleChain> ruleChainFuture = ctx.getRuleChainService().findRuleChainByIdAsync(edgeEvent.getTenantId(), ruleChainId);
  521 + Futures.addCallback(ruleChainFuture,
  522 + new FutureCallback<RuleChain>() {
  523 + @Override
  524 + public void onSuccess(@Nullable RuleChain ruleChain) {
  525 + if (ruleChain != null) {
  526 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  527 + .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain))
  528 + .build();
  529 + outputStream.onNext(ResponseMsg.newBuilder()
  530 + .setEntityUpdateMsg(entityUpdateMsg)
  531 + .build());
  532 + }
  533 + }
473 534
474   - @Override
475   - public void onFailure(Throwable t) {
476   - log.warn("Can't processRuleChainCRUD, edgeEvent [{}]", edgeEvent, t);
477   - }
478   - }, ctx.getDbCallbackExecutor());
  535 + @Override
  536 + public void onFailure(Throwable t) {
  537 + log.warn("Can't processRuleChainCRUD, edgeEvent [{}]", edgeEvent, t);
  538 + }
  539 + }, ctx.getDbCallbackExecutor());
  540 + break;
  541 + case ENTITY_DELETED_RPC_MESSAGE:
  542 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  543 + .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainDeleteMsg(ruleChainId))
  544 + .build();
  545 + outputStream.onNext(ResponseMsg.newBuilder()
  546 + .setEntityUpdateMsg(entityUpdateMsg)
  547 + .build());
  548 + break;
  549 + }
479 550 }
480 551
481 552 private void processRuleChainMetadataCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
... ... @@ -509,26 +580,40 @@ public final class EdgeGrpcSession implements Closeable {
509 580
510 581 private void processUserCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
511 582 UserId userId = new UserId(edgeEvent.getEntityId());
512   - ListenableFuture<User> userFuture = ctx.getUserService().findUserByIdAsync(edgeEvent.getTenantId(), userId);
513   - Futures.addCallback(userFuture,
514   - new FutureCallback<User>() {
515   - @Override
516   - public void onSuccess(@Nullable User user) {
517   - if (user != null) {
518   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
519   - .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user))
520   - .build();
521   - outputStream.onNext(ResponseMsg.newBuilder()
522   - .setEntityUpdateMsg(entityUpdateMsg)
523   - .build());
524   - }
525   - }
  583 + switch (msgType) {
  584 + case ENTITY_CREATED_RPC_MESSAGE:
  585 + case ENTITY_UPDATED_RPC_MESSAGE:
  586 + case DEVICE_CONFLICT_RPC_MESSAGE:
  587 + ListenableFuture<User> userFuture = ctx.getUserService().findUserByIdAsync(edgeEvent.getTenantId(), userId);
  588 + Futures.addCallback(userFuture,
  589 + new FutureCallback<User>() {
  590 + @Override
  591 + public void onSuccess(@Nullable User user) {
  592 + if (user != null) {
  593 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  594 + .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user))
  595 + .build();
  596 + outputStream.onNext(ResponseMsg.newBuilder()
  597 + .setEntityUpdateMsg(entityUpdateMsg)
  598 + .build());
  599 + }
  600 + }
526 601
527   - @Override
528   - public void onFailure(Throwable t) {
529   - log.warn("Can't processUserCRUD, edgeEvent [{}]", edgeEvent, t);
530   - }
531   - }, ctx.getDbCallbackExecutor());
  602 + @Override
  603 + public void onFailure(Throwable t) {
  604 + log.warn("Can't processUserCRUD, edgeEvent [{}]", edgeEvent, t);
  605 + }
  606 + }, ctx.getDbCallbackExecutor());
  607 + break;
  608 + case ENTITY_DELETED_RPC_MESSAGE:
  609 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  610 + .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserDeleteMsg(userId))
  611 + .build();
  612 + outputStream.onNext(ResponseMsg.newBuilder()
  613 + .setEntityUpdateMsg(entityUpdateMsg)
  614 + .build());
  615 + break;
  616 + }
532 617 }
533 618
534 619 private void processRelationCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
... ... @@ -567,9 +652,9 @@ public final class EdgeGrpcSession implements Closeable {
567 652
568 653 private UpdateMsgType getResponseMsgType(ActionType actionType) {
569 654 switch (actionType) {
570   - case ADDED:
571   - return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
572 655 case UPDATED:
  656 + return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
  657 + case ADDED:
573 658 case ASSIGNED_TO_EDGE:
574 659 return ENTITY_CREATED_RPC_MESSAGE;
575 660 case DELETED:
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.constructor;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.stereotype.Component;
20 20 import org.thingsboard.server.common.data.asset.Asset;
  21 +import org.thingsboard.server.common.data.id.AssetId;
21 22 import org.thingsboard.server.gen.edge.AssetUpdateMsg;
22 23 import org.thingsboard.server.gen.edge.UpdateMsgType;
23 24
... ... @@ -38,4 +39,10 @@ public class AssetUpdateMsgConstructor {
38 39 return builder.build();
39 40 }
40 41
  42 + public AssetUpdateMsg constructAssetDeleteMsg(AssetId assetId) {
  43 + return AssetUpdateMsg.newBuilder()
  44 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  45 + .setIdMSB(assetId.getId().getMostSignificantBits())
  46 + .setIdLSB(assetId.getId().getLeastSignificantBits()).build();
  47 + }
41 48 }
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Autowired;
20 20 import org.springframework.stereotype.Component;
21 21 import org.thingsboard.server.common.data.Dashboard;
  22 +import org.thingsboard.server.common.data.id.DashboardId;
22 23 import org.thingsboard.server.dao.dashboard.DashboardService;
23 24 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
24 25 import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
... ... @@ -42,4 +43,11 @@ public class DashboardUpdateMsgConstructor {
42 43 return builder.build();
43 44 }
44 45
  46 + public DashboardUpdateMsg constructDashboardDeleteMsg(DashboardId dashboardId) {
  47 + return DashboardUpdateMsg.newBuilder()
  48 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  49 + .setIdMSB(dashboardId.getId().getMostSignificantBits())
  50 + .setIdLSB(dashboardId.getId().getLeastSignificantBits()).build();
  51 + }
  52 +
45 53 }
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Autowired;
20 20 import org.springframework.stereotype.Component;
21 21 import org.thingsboard.server.common.data.Device;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
22 23 import org.thingsboard.server.common.data.security.DeviceCredentials;
23 24 import org.thingsboard.server.dao.device.DeviceCredentialsService;
24 25 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
... ... @@ -58,4 +59,11 @@ public class DeviceUpdateMsgConstructor {
58 59 }
59 60 return builder.build();
60 61 }
  62 +
  63 + public DeviceUpdateMsg constructDeviceDeleteMsg(DeviceId deviceId) {
  64 + return DeviceUpdateMsg.newBuilder()
  65 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  66 + .setIdMSB(deviceId.getId().getMostSignificantBits())
  67 + .setIdLSB(deviceId.getId().getLeastSignificantBits()).build();
  68 + }
61 69 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.constructor;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.stereotype.Component;
20 20 import org.thingsboard.server.common.data.EntityView;
  21 +import org.thingsboard.server.common.data.id.EntityViewId;
21 22 import org.thingsboard.server.gen.edge.EdgeEntityType;
22 23 import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
23 24 import org.thingsboard.server.gen.edge.UpdateMsgType;
... ... @@ -50,4 +51,10 @@ public class EntityViewUpdateMsgConstructor {
50 51 return builder.build();
51 52 }
52 53
  54 + public EntityViewUpdateMsg constructEntityViewDeleteMsg(EntityViewId entityViewId) {
  55 + return EntityViewUpdateMsg.newBuilder()
  56 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  57 + .setIdMSB(entityViewId.getId().getMostSignificantBits())
  58 + .setIdLSB(entityViewId.getId().getLeastSignificantBits()).build();
  59 + }
53 60 }
... ...
... ... @@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.springframework.stereotype.Component;
22   -import org.thingsboard.server.common.data.edge.Edge;
23 22 import org.thingsboard.server.common.data.id.RuleChainId;
24 23 import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
25 24 import org.thingsboard.server.common.data.rule.RuleChain;
... ... @@ -126,7 +125,6 @@ public class RuleChainUpdateMsgConstructor {
126 125 .build();
127 126 }
128 127
129   -
130 128 private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
131 129 return RuleNodeProto.newBuilder()
132 130 .setIdMSB(node.getId().getId().getMostSignificantBits())
... ... @@ -139,4 +137,11 @@ public class RuleChainUpdateMsgConstructor {
139 137 .build();
140 138 }
141 139
  140 + public RuleChainUpdateMsg constructRuleChainDeleteMsg(RuleChainId ruleChainId) {
  141 + return RuleChainUpdateMsg.newBuilder()
  142 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  143 + .setIdMSB(ruleChainId.getId().getMostSignificantBits())
  144 + .setIdLSB(ruleChainId.getId().getLeastSignificantBits()).build();
  145 + }
  146 +
142 147 }
... ...
... ... @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Autowired;
20 20 import org.springframework.stereotype.Component;
21 21 import org.thingsboard.server.common.data.User;
  22 +import org.thingsboard.server.common.data.id.UserId;
22 23 import org.thingsboard.server.common.data.security.UserCredentials;
23 24 import org.thingsboard.server.dao.user.UserService;
24 25 import org.thingsboard.server.dao.util.mapping.JacksonUtil;
... ... @@ -61,4 +62,11 @@ public class UserUpdateMsgConstructor {
61 62 }
62 63 return builder.build();
63 64 }
  65 +
  66 + public UserUpdateMsg constructUserDeleteMsg(UserId userId) {
  67 + return UserUpdateMsg.newBuilder()
  68 + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
  69 + .setIdMSB(userId.getId().getMostSignificantBits())
  70 + .setIdLSB(userId.getId().getLeastSignificantBits()).build();
  71 + }
64 72 }
... ...
... ... @@ -70,7 +70,6 @@ public interface EdgeService {
70 70
71 71 ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId);
72 72
73   -
74 73 void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId);
75 74
76 75 ListenableFuture<TimePageData<Edge>> findEdgesByTenantIdAndRuleChainId(TenantId tenantId, RuleChainId ruleChainId, TimePageLink pageLink);
... ...
... ... @@ -356,14 +356,16 @@ message FromDeviceRPCResponseProto {
356 356 message EdgeNotificationMsgProto {
357 357 int64 tenantIdMSB = 1;
358 358 int64 tenantIdLSB = 2;
359   - string edgeEventType = 3;
360   - string edgeEventAction = 4;
361   - int64 entityIdMSB = 5;
362   - int64 entityIdLSB = 6;
363   - string entityType = 7;
364   - string entityBody = 8;
365   - PostTelemetryMsg postTelemetryMsg = 9;
366   - PostAttributeMsg postAttributesMsg = 10;
  359 + int64 edgeIdMSB = 3;
  360 + int64 edgeIdLSB = 4;
  361 + string edgeEventType = 5;
  362 + string edgeEventAction = 6;
  363 + int64 entityIdMSB = 7;
  364 + int64 entityIdLSB = 8;
  365 + string entityType = 9;
  366 + string entityBody = 10;
  367 + PostTelemetryMsg postTelemetryMsg = 11;
  368 + PostAttributeMsg postAttributesMsg = 12;
367 369 }
368 370
369 371 /**
... ...