Commit 581f23b0b067bdfb06c9a9955eb4dd0331db4187

Authored by Bohdan Smetaniuk
1 parent a676fa9c

fixes + processing attributes delete msg from edge

... ... @@ -242,7 +242,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
242 242 }
243 243 }
244 244 } else {
245   - ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsByEntityId(tenantId, entityId);
  245 + ListenableFuture<List<EdgeId>> edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, dbCallbackExecutorService);
246 246 Futures.transform(edgeIdsFuture, edgeIds -> {
247 247 if (edgeIds != null && !edgeIds.isEmpty()) {
248 248 for (EdgeId edgeId : edgeIds) {
... ... @@ -321,7 +321,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
321 321 if (alarm != null) {
322 322 EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
323 323 if (edgeEventType != null) {
324   - ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator());
  324 + ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), dbCallbackExecutorService);
325 325 Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> {
326 326 if (relatedEdgeIdsByEntityId != null) {
327 327 for (EdgeId edgeId : relatedEdgeIdsByEntityId) {
... ... @@ -346,8 +346,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
346 346 if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
347 347 !relation.getTo().getEntityType().equals(EntityType.EDGE)) {
348 348 List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
349   - futures.add(findRelatedEdgeIdsByEntityId(tenantId, relation.getTo()));
350   - futures.add(findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom()));
  349 + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo(), dbCallbackExecutorService));
  350 + futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom(), dbCallbackExecutorService));
351 351 ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
352 352 Futures.transform(combinedFuture, listOfListsEdgeIds -> {
353 353 Set<EdgeId> uniqueEdgeIds = new HashSet<>();
... ... @@ -373,48 +373,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
373 373 }
374 374 }
375 375
376   - private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
377   - switch (entityId.getEntityType()) {
378   - case DEVICE:
379   - case ASSET:
380   - case ENTITY_VIEW:
381   - ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture =
382   - relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
383   - return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> {
384   - if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
385   - return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
386   - } else {
387   - return Collections.emptyList();
388   - }
389   - }, dbCallbackExecutorService);
390   - case DASHBOARD:
391   - return convertToEdgeIds(edgeService.findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())));
392   - case RULE_CHAIN:
393   - return convertToEdgeIds(edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())));
394   - case USER:
395   - User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));
396   - TextPageData<Edge> edges;
397   - if (userById.getCustomerId() == null || userById.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
398   - edges = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
399   - } else {
400   - edges = edgeService.findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
401   - }
402   - return convertToEdgeIds(Futures.immediateFuture(edges.getData()));
403   - default:
404   - return Futures.immediateFuture(Collections.emptyList());
405   - }
406   - }
407   -
408   - private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future) {
409   - return Futures.transform(future, edges -> {
410   - if (edges != null && !edges.isEmpty()) {
411   - return edges.stream().map(IdBased::getId).collect(Collectors.toList());
412   - } else {
413   - return Collections.emptyList();
414   - }
415   - }, dbCallbackExecutorService);
416   - }
417   -
418 376 private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) {
419 377 switch (entityType) {
420 378 case DEVICE:
... ...
... ... @@ -33,6 +33,7 @@ import lombok.Data;
33 33 import lombok.extern.slf4j.Slf4j;
34 34 import org.apache.commons.lang.RandomStringUtils;
35 35 import org.checkerframework.checker.nullness.qual.Nullable;
  36 +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
36 37 import org.thingsboard.server.common.data.AdminSettings;
37 38 import org.thingsboard.server.common.data.Customer;
38 39 import org.thingsboard.server.common.data.Dashboard;
... ... @@ -64,6 +65,7 @@ import org.thingsboard.server.common.data.id.TenantId;
64 65 import org.thingsboard.server.common.data.id.UserId;
65 66 import org.thingsboard.server.common.data.id.WidgetTypeId;
66 67 import org.thingsboard.server.common.data.id.WidgetsBundleId;
  68 +import org.thingsboard.server.common.data.kv.AttributeKey;
67 69 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
68 70 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
69 71 import org.thingsboard.server.common.data.kv.LongDataEntry;
... ... @@ -88,6 +90,7 @@ import org.thingsboard.server.common.transport.util.JsonUtils;
88 90 import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg;
89 91 import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
90 92 import org.thingsboard.server.gen.edge.AssetUpdateMsg;
  93 +import org.thingsboard.server.gen.edge.AttributeDeleteMsg;
91 94 import org.thingsboard.server.gen.edge.AttributesRequestMsg;
92 95 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
93 96 import org.thingsboard.server.gen.edge.ConnectResponseCode;
... ... @@ -125,11 +128,12 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
125 128 import org.thingsboard.server.service.edge.EdgeContextComponent;
126 129
127 130 import java.io.Closeable;
128   -import java.io.IOException;
129 131 import java.util.ArrayList;
130 132 import java.util.Collections;
  133 +import java.util.HashSet;
131 134 import java.util.List;
132 135 import java.util.Optional;
  136 +import java.util.Set;
133 137 import java.util.UUID;
134 138 import java.util.concurrent.CountDownLatch;
135 139 import java.util.concurrent.ExecutionException;
... ... @@ -382,7 +386,7 @@ public final class EdgeGrpcSession implements Closeable {
382 386 ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
383 387 }
384 388
385   - private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) throws IOException {
  389 + private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) {
386 390 log.trace("Executing processTelemetryMessage, edgeEvent [{}]", edgeEvent);
387 391 EntityId entityId = null;
388 392 switch (edgeEvent.getEdgeEventType()) {
... ... @@ -823,6 +827,9 @@ public final class EdgeGrpcSession implements Closeable {
823 827 result.add(processPostTelemetry(entityId, entityData.getPostTelemetryMsg(), metaData));
824 828 }
825 829 }
  830 + if (entityData.hasAttributeDeleteMsg()) {
  831 + result.add(processAttributeDeleteMsg(entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));
  832 + }
826 833 }
827 834 }
828 835
... ... @@ -948,6 +955,26 @@ public final class EdgeGrpcSession implements Closeable {
948 955 return Futures.immediateFuture(null);
949 956 }
950 957
  958 + private ListenableFuture<Void> processAttributeDeleteMsg(EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
  959 + try {
  960 + String scope = attributeDeleteMsg.getScope();
  961 + List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
  962 + ctx.getAttributesService().removeAll(edge.getTenantId(), entityId, scope, attributeNames);
  963 + if (EntityType.DEVICE.name().equals(entityType)) {
  964 + Set<AttributeKey> attributeKeys = new HashSet<>();
  965 + for (String attributeName : attributeNames) {
  966 + attributeKeys.add(new AttributeKey(scope, attributeName));
  967 + }
  968 + ctx.getTbClusterService().pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
  969 + edge.getTenantId(), (DeviceId) entityId, attributeKeys), null);
  970 + }
  971 + } catch (Exception e) {
  972 + log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, e);
  973 + return Futures.immediateFailedFuture(new RuntimeException("Can't process attribute delete msg " + attributeDeleteMsg, e));
  974 + }
  975 + return Futures.immediateFuture(null);
  976 + }
  977 +
951 978 private ListenableFuture<Void> onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) {
952 979 DeviceId edgeDeviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
953 980 switch (deviceUpdateMsg.getMsgType()) {
... ...
... ... @@ -43,9 +43,11 @@ public class EntityDataMsgConstructor {
43 43 case TIMESERIES_UPDATED:
44 44 try {
45 45 JsonObject data = entityData.getAsJsonObject();
46   - long ts = System.currentTimeMillis();
  46 + long ts;
47 47 if (data.get("ts") != null && !data.get("ts").isJsonNull()) {
48   - ts = data.getAsJsonObject("ts").getAsLong();
  48 + ts = data.getAsJsonPrimitive("ts").getAsLong();
  49 + } else {
  50 + ts = System.currentTimeMillis();
49 51 }
50 52 builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts));
51 53 } catch (Exception e) {
... ...
... ... @@ -22,15 +22,15 @@ import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
22 22 import org.thingsboard.server.common.data.id.CustomerId;
23 23 import org.thingsboard.server.common.data.id.DashboardId;
24 24 import org.thingsboard.server.common.data.id.EdgeId;
  25 +import org.thingsboard.server.common.data.id.EntityId;
25 26 import org.thingsboard.server.common.data.id.RuleChainId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28 import org.thingsboard.server.common.data.page.TextPageData;
28 29 import org.thingsboard.server.common.data.page.TextPageLink;
29   -import org.thingsboard.server.common.data.page.TimePageData;
30   -import org.thingsboard.server.common.data.page.TimePageLink;
31 30
32 31 import java.util.List;
33 32 import java.util.Optional;
  33 +import java.util.concurrent.Executor;
34 34
35 35 public interface EdgeService {
36 36
... ... @@ -75,6 +75,8 @@ public interface EdgeService {
75 75 ListenableFuture<List<Edge>> findEdgesByTenantIdAndRuleChainId(TenantId tenantId, RuleChainId ruleChainId);
76 76
77 77 ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId);
  78 +
  79 + ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService);
78 80 }
79 81
80 82
... ...
... ... @@ -31,31 +31,35 @@ import org.thingsboard.server.common.data.Customer;
31 31 import org.thingsboard.server.common.data.EntitySubtype;
32 32 import org.thingsboard.server.common.data.EntityType;
33 33 import org.thingsboard.server.common.data.Tenant;
  34 +import org.thingsboard.server.common.data.User;
34 35 import org.thingsboard.server.common.data.edge.Edge;
35 36 import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
36 37 import org.thingsboard.server.common.data.id.CustomerId;
37 38 import org.thingsboard.server.common.data.id.DashboardId;
38 39 import org.thingsboard.server.common.data.id.EdgeId;
39 40 import org.thingsboard.server.common.data.id.EntityId;
  41 +import org.thingsboard.server.common.data.id.IdBased;
40 42 import org.thingsboard.server.common.data.id.RuleChainId;
41 43 import org.thingsboard.server.common.data.id.TenantId;
  44 +import org.thingsboard.server.common.data.id.UserId;
42 45 import org.thingsboard.server.common.data.page.TextPageData;
43 46 import org.thingsboard.server.common.data.page.TextPageLink;
44   -import org.thingsboard.server.common.data.page.TimePageData;
45   -import org.thingsboard.server.common.data.page.TimePageLink;
46 47 import org.thingsboard.server.common.data.relation.EntityRelation;
47 48 import org.thingsboard.server.common.data.relation.EntitySearchDirection;
  49 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
48 50 import org.thingsboard.server.common.data.rule.RuleChain;
49 51 import org.thingsboard.server.dao.customer.CustomerDao;
50 52 import org.thingsboard.server.dao.dashboard.DashboardService;
51 53 import org.thingsboard.server.dao.entity.AbstractEntityService;
52 54 import org.thingsboard.server.dao.exception.DataValidationException;
  55 +import org.thingsboard.server.dao.model.ModelConstants;
53 56 import org.thingsboard.server.dao.relation.RelationService;
54 57 import org.thingsboard.server.dao.rule.RuleChainService;
55 58 import org.thingsboard.server.dao.service.DataValidator;
56 59 import org.thingsboard.server.dao.service.PaginatedRemover;
57 60 import org.thingsboard.server.dao.service.Validator;
58 61 import org.thingsboard.server.dao.tenant.TenantDao;
  62 +import org.thingsboard.server.dao.user.UserService;
59 63
60 64 import javax.annotation.Nullable;
61 65 import java.util.ArrayList;
... ... @@ -63,6 +67,7 @@ import java.util.Collections;
63 67 import java.util.Comparator;
64 68 import java.util.List;
65 69 import java.util.Optional;
  70 +import java.util.concurrent.Executor;
66 71 import java.util.stream.Collectors;
67 72
68 73 import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE;
... ... @@ -92,6 +97,9 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
92 97 private CustomerDao customerDao;
93 98
94 99 @Autowired
  100 + private UserService userService;
  101 +
  102 + @Autowired
95 103 private CacheManager cacheManager;
96 104
97 105 @Autowired
... ... @@ -420,4 +428,47 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
420 428 }
421 429 };
422 430
  431 + @Override
  432 + public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, Executor executorService) {
  433 + switch (entityId.getEntityType()) {
  434 + case DEVICE:
  435 + case ASSET:
  436 + case ENTITY_VIEW:
  437 + ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture =
  438 + relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
  439 + return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> {
  440 + if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
  441 + return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
  442 + } else {
  443 + return Collections.emptyList();
  444 + }
  445 + }, executorService);
  446 + case DASHBOARD:
  447 + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId())), executorService);
  448 + case RULE_CHAIN:
  449 + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId())), executorService);
  450 + case USER:
  451 + User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));
  452 + TextPageData<Edge> edges;
  453 + if (userById.getCustomerId() == null || userById.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
  454 + edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
  455 + } else {
  456 + edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
  457 + }
  458 + return convertToEdgeIds(Futures.immediateFuture(edges.getData()), executorService);
  459 + default:
  460 + return Futures.immediateFuture(Collections.emptyList());
  461 + }
  462 + }
  463 +
  464 + private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future, Executor executorService) {
  465 + return Futures.transform(future, edges -> {
  466 + if (edges != null && !edges.isEmpty()) {
  467 + return edges.stream().map(IdBased::getId).collect(Collectors.toList());
  468 + } else {
  469 + return Collections.emptyList();
  470 + }
  471 + }, executorService);
  472 + }
  473 +
423 474 }
... ...
... ... @@ -232,14 +232,7 @@ public class TbMsgPushToEdgeNode implements TbNode {
232 232 TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
233 233 return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
234 234 } else {
235   - ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
236   - return Futures.transform(future, relations -> {
237   - List<EdgeId> result = new ArrayList<>();
238   - if (relations != null && relations.size() > 0) {
239   - result.add(new EdgeId(relations.get(0).getFrom().getId()));
240   - }
241   - return result;
242   - }, ctx.getDbCallbackExecutor());
  235 + return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId, ctx.getDbCallbackExecutor());
243 236 }
244 237 }
245 238
... ...