Commit 2be3d9209f2439cf36163e7f9c63b7c89af7eac1

Authored by VoBa
Committed by GitHub
2 parents 787c33be 67fc2f5a

Merge pull request #39 from BohdanSmetanyuk/code_fixes

code fixes
@@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
242 } 242 }
243 } 243 }
244 } else { 244 } else {
245 - ListenableFuture<List<EdgeId>> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, dbCallbackExecutorService); 245 + ListenableFuture<List<EdgeId>> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId);
246 Futures.transform(edgeIdsFuture, edgeIds -> { 246 Futures.transform(edgeIdsFuture, edgeIds -> {
247 if (edgeIds != null && !edgeIds.isEmpty()) { 247 if (edgeIds != null && !edgeIds.isEmpty()) {
248 for (EdgeId edgeId : edgeIds) { 248 for (EdgeId edgeId : edgeIds) {
@@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
321 if (alarm != null) { 321 if (alarm != null) {
322 EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); 322 EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
323 if (edgeEventType != null) { 323 if (edgeEventType != null) {
324 - ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), dbCallbackExecutorService); 324 + ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator());
325 Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { 325 Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> {
326 if (relatedEdgeIdsByEntityId != null) { 326 if (relatedEdgeIdsByEntityId != null) {
327 for (EdgeId edgeId : relatedEdgeIdsByEntityId) { 327 for (EdgeId edgeId : relatedEdgeIdsByEntityId) {
@@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
346 if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) && 346 if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
347 !relation.getTo().getEntityType().equals(EntityType.EDGE)) { 347 !relation.getTo().getEntityType().equals(EntityType.EDGE)) {
348 List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>(); 348 List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
349 - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo(), dbCallbackExecutorService));  
350 - futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom(), dbCallbackExecutorService)); 349 + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo()));
  350 + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom()));
351 ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures); 351 ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
352 Futures.transform(combinedFuture, listOfListsEdgeIds -> { 352 Futures.transform(combinedFuture, listOfListsEdgeIds -> {
353 Set<EdgeId> uniqueEdgeIds = new HashSet<>(); 353 Set<EdgeId> uniqueEdgeIds = new HashSet<>();
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures; 25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture; 26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors; 27 import com.google.common.util.concurrent.MoreExecutors;
  28 +import com.google.common.util.concurrent.SettableFuture;
28 import com.google.gson.Gson; 29 import com.google.gson.Gson;
29 import com.google.gson.JsonElement; 30 import com.google.gson.JsonElement;
30 import com.google.gson.JsonObject; 31 import com.google.gson.JsonObject;
@@ -937,42 +938,71 @@ public final class EdgeGrpcSession implements Closeable { @@ -937,42 +938,71 @@ public final class EdgeGrpcSession implements Closeable {
937 } 938 }
938 939
939 private ListenableFuture<Void> processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { 940 private ListenableFuture<Void> processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
  941 + SettableFuture<Void> futureToSet = SettableFuture.create();
940 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { 942 for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
941 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); 943 JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
942 metaData.putValue("ts", tsKv.getTs() + ""); 944 metaData.putValue("ts", tsKv.getTs() + "");
943 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json)); 945 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json));
944 - // TODO: voba - verify that null callback is OK  
945 - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null); 946 + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
  947 + @Override
  948 + public void onSuccess(TbQueueMsgMetadata metadata) {
  949 + futureToSet.set(null);
  950 + }
  951 +
  952 + @Override
  953 + public void onFailure(Throwable t) {
  954 + log.error("Can't process post telemetry [{}]", msg, t);
  955 + futureToSet.setException(t);
  956 + }
  957 + });
946 } 958 }
947 - return Futures.immediateFuture(null); 959 + return futureToSet;
948 } 960 }
949 961
950 private ListenableFuture<Void> processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { 962 private ListenableFuture<Void> processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
  963 + SettableFuture<Void> futureToSet = SettableFuture.create();
951 JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); 964 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
952 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json)); 965 TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json));
953 - // TODO: voba - verify that null callback is OK  
954 - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null);  
955 - return Futures.immediateFuture(null); 966 + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
  967 + @Override
  968 + public void onSuccess(TbQueueMsgMetadata metadata) {
  969 + futureToSet.set(null);
  970 + }
  971 +
  972 + @Override
  973 + public void onFailure(Throwable t) {
  974 + log.error("Can't process post attributes [{}]", msg, t);
  975 + futureToSet.setException(t);
  976 + }
  977 + });
  978 + return futureToSet;
956 } 979 }
957 980
958 private ListenableFuture<Void> processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { 981 private ListenableFuture<Void> processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
959 - try {  
960 - String scope = attributeDeleteMsg.getScope();  
961 - List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();  
962 - ctx.getAttributesService().removeAll(edge.getTenantId(), entityId, scope, attributeNames);  
963 - if (EntityType.DEVICE.name().equals(entityType)) {  
964 - Set<AttributeKey> attributeKeys = new HashSet<>();  
965 - for (String attributeName : attributeNames) {  
966 - attributeKeys.add(new AttributeKey(scope, attributeName));  
967 - }  
968 - ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(  
969 - edge.getTenantId(), (DeviceId) entityId, attributeKeys), null); 982 + SettableFuture<Void> futureToSet = SettableFuture.create();
  983 + String scope = attributeDeleteMsg.getScope();
  984 + List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
  985 + ctx.getAttributesService().removeAll(edge.getTenantId(), entityId, scope, attributeNames);
  986 + if (EntityType.DEVICE.name().equals(entityType)) {
  987 + Set<AttributeKey> attributeKeys = new HashSet<>();
  988 + for (String attributeName : attributeNames) {
  989 + attributeKeys.add(new AttributeKey(scope, attributeName));
970 } 990 }
971 - } catch (Exception e) {  
972 - log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, e);  
973 - return Futures.immediateFailedFuture(new RuntimeException("Can't process attribute delete msg " + attributeDeleteMsg, e)); 991 + ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
  992 + edge.getTenantId(), (DeviceId) entityId, attributeKeys), new TbQueueCallback() {
  993 + @Override
  994 + public void onSuccess(TbQueueMsgMetadata metadata) {
  995 + futureToSet.set(null);
  996 + }
  997 +
  998 + @Override
  999 + public void onFailure(Throwable t) {
  1000 + log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, t);
  1001 + futureToSet.setException(t);
  1002 + }
  1003 + });
974 } 1004 }
975 - return Futures.immediateFuture(null); 1005 + return futureToSet;
976 } 1006 }
977 1007
978 private ListenableFuture<Void> onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) { 1008 private ListenableFuture<Void> onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) {
@@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.page.TextPageLink; @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.page.TextPageLink;
30 30
31 import java.util.List; 31 import java.util.List;
32 import java.util.Optional; 32 import java.util.Optional;
33 -import java.util.concurrent.Executor;  
34 33
35 public interface EdgeService { 34 public interface EdgeService {
36 35
@@ -76,7 +75,7 @@ public interface EdgeService { @@ -76,7 +75,7 @@ public interface EdgeService {
76 75
77 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId); 76 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId);
78 77
79 - ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService); 78 + ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId);
80 } 79 }
81 80
82 81
@@ -67,7 +67,6 @@ import java.util.Collections; @@ -67,7 +67,6 @@ import java.util.Collections;
67 import java.util.Comparator; 67 import java.util.Comparator;
68 import java.util.List; 68 import java.util.List;
69 import java.util.Optional; 69 import java.util.Optional;
70 -import java.util.concurrent.Executor;  
71 import java.util.stream.Collectors; 70 import java.util.stream.Collectors;
72 71
73 import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; 72 import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE;
@@ -429,7 +428,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @@ -429,7 +428,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
429 }; 428 };
430 429
431 @Override 430 @Override
432 - public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService) { 431 + public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
433 switch (entityId.getEntityType()) { 432 switch (entityId.getEntityType()) {
434 case DEVICE: 433 case DEVICE:
435 case ASSET: 434 case ASSET:
@@ -442,11 +441,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @@ -442,11 +441,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
442 } else { 441 } else {
443 return Collections.emptyList(); 442 return Collections.emptyList();
444 } 443 }
445 - }, executorService); 444 + }, MoreExecutors.directExecutor());
446 case DASHBOARD: 445 case DASHBOARD:
447 - return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())), executorService); 446 + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())));
448 case RULE_CHAIN: 447 case RULE_CHAIN:
449 - return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())), executorService); 448 + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())));
450 case USER: 449 case USER:
451 User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); 450 User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));
452 TextPageData<Edge> edges; 451 TextPageData<Edge> edges;
@@ -455,20 +454,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @@ -455,20 +454,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
455 } else { 454 } else {
456 edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); 455 edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
457 } 456 }
458 - return convertToEdgeIds(Futures.immediateFuture(edges.getData()), executorService); 457 + return convertToEdgeIds(Futures.immediateFuture(edges.getData()));
459 default: 458 default:
460 return Futures.immediateFuture(Collections.emptyList()); 459 return Futures.immediateFuture(Collections.emptyList());
461 } 460 }
462 } 461 }
463 462
464 - private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future, Executor executorService) { 463 + private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future) {
465 return Futures.transform(future, edges -> { 464 return Futures.transform(future, edges -> {
466 if (edges != null && !edges.isEmpty()) { 465 if (edges != null && !edges.isEmpty()) {
467 return edges.stream().map(IdBased::getId).collect(Collectors.toList()); 466 return edges.stream().map(IdBased::getId).collect(Collectors.toList());
468 } else { 467 } else {
469 return Collections.emptyList(); 468 return Collections.emptyList();
470 } 469 }
471 - }, executorService); 470 + }, MoreExecutors.directExecutor());
472 } 471 }
473 472
474 } 473 }
@@ -232,7 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -232,7 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode {
232 TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); 232 TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
233 return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); 233 return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
234 } else { 234 } else {
235 - return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId, ctx.getDbCallbackExecutor()); 235 + return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId);
236 } 236 }
237 } 237 }
238 238