Showing
5 changed files
with
53 additions
and
23 deletions
@@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | @@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | ||
242 | } | 242 | } |
243 | } | 243 | } |
244 | } else { | 244 | } else { |
245 | - ListenableFuture<List<EdgeId>> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, dbCallbackExecutorService); | 245 | + ListenableFuture<List<EdgeId>> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId); |
246 | Futures.transform(edgeIdsFuture, edgeIds -> { | 246 | Futures.transform(edgeIdsFuture, edgeIds -> { |
247 | if (edgeIds != null && !edgeIds.isEmpty()) { | 247 | if (edgeIds != null && !edgeIds.isEmpty()) { |
248 | for (EdgeId edgeId : edgeIds) { | 248 | for (EdgeId edgeId : edgeIds) { |
@@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | @@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | ||
321 | if (alarm != null) { | 321 | if (alarm != null) { |
322 | EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); | 322 | EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); |
323 | if (edgeEventType != null) { | 323 | if (edgeEventType != null) { |
324 | - ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), dbCallbackExecutorService); | 324 | + ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); |
325 | Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { | 325 | Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { |
326 | if (relatedEdgeIdsByEntityId != null) { | 326 | if (relatedEdgeIdsByEntityId != null) { |
327 | for (EdgeId edgeId : relatedEdgeIdsByEntityId) { | 327 | for (EdgeId edgeId : relatedEdgeIdsByEntityId) { |
@@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | @@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | ||
346 | if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && | 346 | if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && |
347 | !relation.getTo().getEntityType().equals(EntityType.EDGE)) { | 347 | !relation.getTo().getEntityType().equals(EntityType.EDGE)) { |
348 | List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>(); | 348 | List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>(); |
349 | - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo(), dbCallbackExecutorService)); | ||
350 | - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom(), dbCallbackExecutorService)); | 349 | + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo())); |
350 | + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom())); | ||
351 | ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures); | 351 | ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures); |
352 | Futures.transform(combinedFuture, listOfListsEdgeIds -> { | 352 | Futures.transform(combinedFuture, listOfListsEdgeIds -> { |
353 | Set<EdgeId> uniqueEdgeIds = new HashSet<>(); | 353 | Set<EdgeId> uniqueEdgeIds = new HashSet<>(); |
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; | @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; | ||
25 | import com.google.common.util.concurrent.Futures; | 25 | import com.google.common.util.concurrent.Futures; |
26 | import com.google.common.util.concurrent.ListenableFuture; | 26 | import com.google.common.util.concurrent.ListenableFuture; |
27 | import com.google.common.util.concurrent.MoreExecutors; | 27 | import com.google.common.util.concurrent.MoreExecutors; |
28 | +import com.google.common.util.concurrent.SettableFuture; | ||
28 | import com.google.gson.Gson; | 29 | import com.google.gson.Gson; |
29 | import com.google.gson.JsonElement; | 30 | import com.google.gson.JsonElement; |
30 | import com.google.gson.JsonObject; | 31 | import com.google.gson.JsonObject; |
@@ -937,25 +938,46 @@ public final class EdgeGrpcSession implements Closeable { | @@ -937,25 +938,46 @@ public final class EdgeGrpcSession implements Closeable { | ||
937 | } | 938 | } |
938 | 939 | ||
939 | private ListenableFuture<Void> processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { | 940 | private ListenableFuture<Void> processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { |
941 | + SettableFuture<Void> futureToSet = SettableFuture.create(); | ||
940 | for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { | 942 | for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { |
941 | JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); | 943 | JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); |
942 | metaData.putValue("ts", tsKv.getTs() + ""); | 944 | metaData.putValue("ts", tsKv.getTs() + ""); |
943 | TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json)); | 945 | TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json)); |
944 | - // TODO: voba - verify that null callback is OK | ||
945 | - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); | 946 | + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
947 | + @Override | ||
948 | + public void onSuccess(TbQueueMsgMetadata metadata) { | ||
949 | + futureToSet.set(null); | ||
950 | + } | ||
951 | + | ||
952 | + @Override | ||
953 | + public void onFailure(Throwable t) { | ||
954 | + futureToSet.setException(t); | ||
955 | + } | ||
956 | + }); | ||
946 | } | 957 | } |
947 | - return Futures.immediateFuture(null); | 958 | + return futureToSet; |
948 | } | 959 | } |
949 | 960 | ||
950 | private ListenableFuture<Void> processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { | 961 | private ListenableFuture<Void> processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { |
962 | + SettableFuture<Void> futureToSet = SettableFuture.create(); | ||
951 | JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); | 963 | JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); |
952 | TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); | 964 | TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); |
953 | - // TODO: voba - verify that null callback is OK | ||
954 | - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); | ||
955 | - return Futures.immediateFuture(null); | 965 | + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
966 | + @Override | ||
967 | + public void onSuccess(TbQueueMsgMetadata metadata) { | ||
968 | + futureToSet.set(null); | ||
969 | + } | ||
970 | + | ||
971 | + @Override | ||
972 | + public void onFailure(Throwable t) { | ||
973 | + futureToSet.setException(t); | ||
974 | + } | ||
975 | + }); | ||
976 | + return futureToSet; | ||
956 | } | 977 | } |
957 | 978 | ||
958 | private ListenableFuture<Void> processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { | 979 | private ListenableFuture<Void> processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { |
980 | + SettableFuture<Void> futureToSet = SettableFuture.create(); | ||
959 | try { | 981 | try { |
960 | String scope = attributeDeleteMsg.getScope(); | 982 | String scope = attributeDeleteMsg.getScope(); |
961 | List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList(); | 983 | List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList(); |
@@ -966,13 +988,23 @@ public final class EdgeGrpcSession implements Closeable { | @@ -966,13 +988,23 @@ public final class EdgeGrpcSession implements Closeable { | ||
966 | attributeKeys.add(new AttributeKey(scope, attributeName)); | 988 | attributeKeys.add(new AttributeKey(scope, attributeName)); |
967 | } | 989 | } |
968 | ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( | 990 | ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( |
969 | - edge.getTenantId(), (DeviceId) entityId, attributeKeys), null); | 991 | + edge.getTenantId(), (DeviceId) entityId, attributeKeys), new TbQueueCallback() { |
992 | + @Override | ||
993 | + public void onSuccess(TbQueueMsgMetadata metadata) { | ||
994 | + futureToSet.set(null); | ||
995 | + } | ||
996 | + | ||
997 | + @Override | ||
998 | + public void onFailure(Throwable t) { | ||
999 | + futureToSet.setException(t); | ||
1000 | + } | ||
1001 | + }); | ||
970 | } | 1002 | } |
971 | } catch (Exception e) { | 1003 | } catch (Exception e) { |
972 | log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, e); | 1004 | log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, e); |
973 | return Futures.immediateFailedFuture(new RuntimeException("Can't process attribute delete msg " + attributeDeleteMsg, e)); | 1005 | return Futures.immediateFailedFuture(new RuntimeException("Can't process attribute delete msg " + attributeDeleteMsg, e)); |
974 | } | 1006 | } |
975 | - return Futures.immediateFuture(null); | 1007 | + return futureToSet; |
976 | } | 1008 | } |
977 | 1009 | ||
978 | private ListenableFuture<Void> onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { | 1010 | private ListenableFuture<Void> onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { |
@@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.page.TextPageLink; | @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.page.TextPageLink; | ||
30 | 30 | ||
31 | import java.util.List; | 31 | import java.util.List; |
32 | import java.util.Optional; | 32 | import java.util.Optional; |
33 | -import java.util.concurrent.Executor; | ||
34 | 33 | ||
35 | public interface EdgeService { | 34 | public interface EdgeService { |
36 | 35 | ||
@@ -76,7 +75,7 @@ public interface EdgeService { | @@ -76,7 +75,7 @@ public interface EdgeService { | ||
76 | 75 | ||
77 | ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId); | 76 | ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId); |
78 | 77 | ||
79 | - ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService); | 78 | + ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId); |
80 | } | 79 | } |
81 | 80 | ||
82 | 81 |
@@ -67,7 +67,6 @@ import java.util.Collections; | @@ -67,7 +67,6 @@ import java.util.Collections; | ||
67 | import java.util.Comparator; | 67 | import java.util.Comparator; |
68 | import java.util.List; | 68 | import java.util.List; |
69 | import java.util.Optional; | 69 | import java.util.Optional; |
70 | -import java.util.concurrent.Executor; | ||
71 | import java.util.stream.Collectors; | 70 | import java.util.stream.Collectors; |
72 | 71 | ||
73 | import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; | 72 | import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; |
@@ -429,7 +428,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | @@ -429,7 +428,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | ||
429 | }; | 428 | }; |
430 | 429 | ||
431 | @Override | 430 | @Override |
432 | - public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService) { | 431 | + public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { |
433 | switch (entityId.getEntityType()) { | 432 | switch (entityId.getEntityType()) { |
434 | case DEVICE: | 433 | case DEVICE: |
435 | case ASSET: | 434 | case ASSET: |
@@ -442,11 +441,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | @@ -442,11 +441,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | ||
442 | } else { | 441 | } else { |
443 | return Collections.emptyList(); | 442 | return Collections.emptyList(); |
444 | } | 443 | } |
445 | - }, executorService); | 444 | + }, MoreExecutors.directExecutor()); |
446 | case DASHBOARD: | 445 | case DASHBOARD: |
447 | - return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())), executorService); | 446 | + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); |
448 | case RULE_CHAIN: | 447 | case RULE_CHAIN: |
449 | - return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())), executorService); | 448 | + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); |
450 | case USER: | 449 | case USER: |
451 | User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); | 450 | User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); |
452 | TextPageData<Edge> edges; | 451 | TextPageData<Edge> edges; |
@@ -455,20 +454,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | @@ -455,20 +454,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | ||
455 | } else { | 454 | } else { |
456 | edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); | 455 | edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); |
457 | } | 456 | } |
458 | - return convertToEdgeIds(Futures.immediateFuture(edges.getData()), executorService); | 457 | + return convertToEdgeIds(Futures.immediateFuture(edges.getData())); |
459 | default: | 458 | default: |
460 | return Futures.immediateFuture(Collections.emptyList()); | 459 | return Futures.immediateFuture(Collections.emptyList()); |
461 | } | 460 | } |
462 | } | 461 | } |
463 | 462 | ||
464 | - private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future, Executor executorService) { | 463 | + private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future) { |
465 | return Futures.transform(future, edges -> { | 464 | return Futures.transform(future, edges -> { |
466 | if (edges != null && !edges.isEmpty()) { | 465 | if (edges != null && !edges.isEmpty()) { |
467 | return edges.stream().map(IdBased::getId).collect(Collectors.toList()); | 466 | return edges.stream().map(IdBased::getId).collect(Collectors.toList()); |
468 | } else { | 467 | } else { |
469 | return Collections.emptyList(); | 468 | return Collections.emptyList(); |
470 | } | 469 | } |
471 | - }, executorService); | 470 | + }, MoreExecutors.directExecutor()); |
472 | } | 471 | } |
473 | 472 | ||
474 | } | 473 | } |
@@ -232,7 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode { | @@ -232,7 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode { | ||
232 | TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); | 232 | TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); |
233 | return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); | 233 | return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); |
234 | } else { | 234 | } else { |
235 | - return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId, ctx.getDbCallbackExecutor()); | 235 | + return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId); |
236 | } | 236 | } |
237 | } | 237 | } |
238 | 238 |