...
|
...
|
@@ -236,32 +236,35 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { |
236
|
236
|
}
|
237
|
237
|
|
238
|
238
|
private void processRelation(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
239
|
|
- EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class);
|
240
|
|
- List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
|
241
|
|
- futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getTo()));
|
242
|
|
- futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getFrom()));
|
243
|
|
- ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
|
244
|
|
- Futures.transform(combinedFuture, listOfListsEdgeIds -> {
|
245
|
|
- Set<EdgeId> uniqueEdgeIds = new HashSet<>();
|
246
|
|
- if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) {
|
247
|
|
- for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) {
|
248
|
|
- if (listOfListsEdgeId != null) {
|
249
|
|
- uniqueEdgeIds.addAll(listOfListsEdgeId);
|
|
239
|
+ EntityRelation relation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class);
|
|
240
|
+ if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
|
|
241
|
+ !relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
|
242
|
+ List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
|
|
243
|
+ futures.add(findRelatedEdgeIdsByEntityId(tenantId, relation.getTo()));
|
|
244
|
+ futures.add(findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom()));
|
|
245
|
+ ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
|
|
246
|
+ Futures.transform(combinedFuture, listOfListsEdgeIds -> {
|
|
247
|
+ Set<EdgeId> uniqueEdgeIds = new HashSet<>();
|
|
248
|
+ if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) {
|
|
249
|
+ for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) {
|
|
250
|
+ if (listOfListsEdgeId != null) {
|
|
251
|
+ uniqueEdgeIds.addAll(listOfListsEdgeId);
|
|
252
|
+ }
|
250
|
253
|
}
|
251
|
254
|
}
|
252
|
|
- }
|
253
|
|
- if (!uniqueEdgeIds.isEmpty()) {
|
254
|
|
- for (EdgeId edgeId : uniqueEdgeIds) {
|
255
|
|
- saveEdgeEvent(tenantId,
|
256
|
|
- edgeId,
|
257
|
|
- EdgeEventType.RELATION,
|
258
|
|
- ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()),
|
259
|
|
- null,
|
260
|
|
- mapper.valueToTree(entityRelation));
|
|
255
|
+ if (!uniqueEdgeIds.isEmpty()) {
|
|
256
|
+ for (EdgeId edgeId : uniqueEdgeIds) {
|
|
257
|
+ saveEdgeEvent(tenantId,
|
|
258
|
+ edgeId,
|
|
259
|
+ EdgeEventType.RELATION,
|
|
260
|
+ ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()),
|
|
261
|
+ null,
|
|
262
|
+ mapper.valueToTree(relation));
|
|
263
|
+ }
|
261
|
264
|
}
|
262
|
|
- }
|
263
|
|
- return null;
|
264
|
|
- }, dbCallbackExecutorService);
|
|
265
|
+ return null;
|
|
266
|
+ }, dbCallbackExecutorService);
|
|
267
|
+ }
|
265
|
268
|
}
|
266
|
269
|
|
267
|
270
|
private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
|
...
|
...
|
|