Commit 0c7a51242d38989c9b593cdedb6930daf7c0d727
1 parent
69994adb
Tech depts. Added support for Edge entity on push to edge node
Showing
8 changed files
with
95 additions
and
81 deletions
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; | @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; | ||
25 | import org.checkerframework.checker.nullness.qual.Nullable; | 25 | import org.checkerframework.checker.nullness.qual.Nullable; |
26 | import org.springframework.beans.factory.annotation.Autowired; | 26 | import org.springframework.beans.factory.annotation.Autowired; |
27 | import org.springframework.stereotype.Service; | 27 | import org.springframework.stereotype.Service; |
28 | +import org.thingsboard.server.common.data.EdgeUtils; | ||
28 | import org.thingsboard.server.common.data.EntityType; | 29 | import org.thingsboard.server.common.data.EntityType; |
29 | import org.thingsboard.server.common.data.User; | 30 | import org.thingsboard.server.common.data.User; |
30 | import org.thingsboard.server.common.data.alarm.Alarm; | 31 | import org.thingsboard.server.common.data.alarm.Alarm; |
@@ -445,7 +446,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | @@ -445,7 +446,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | ||
445 | @Override | 446 | @Override |
446 | public void onSuccess(@Nullable Alarm alarm) { | 447 | public void onSuccess(@Nullable Alarm alarm) { |
447 | if (alarm != null) { | 448 | if (alarm != null) { |
448 | - EdgeEventType type = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); | 449 | + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); |
449 | if (type != null) { | 450 | if (type != null) { |
450 | ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); | 451 | ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); |
451 | Futures.addCallback(relatedEdgeIdsByEntityIdFuture, new FutureCallback<List<EdgeId>>() { | 452 | Futures.addCallback(relatedEdgeIdsByEntityIdFuture, new FutureCallback<List<EdgeId>>() { |
@@ -518,20 +519,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | @@ -518,20 +519,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { | ||
518 | }, dbCallbackExecutorService); | 519 | }, dbCallbackExecutorService); |
519 | } | 520 | } |
520 | } | 521 | } |
521 | - | ||
522 | - private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) { | ||
523 | - switch (entityType) { | ||
524 | - case DEVICE: | ||
525 | - return EdgeEventType.DEVICE; | ||
526 | - case ASSET: | ||
527 | - return EdgeEventType.ASSET; | ||
528 | - case ENTITY_VIEW: | ||
529 | - return EdgeEventType.ENTITY_VIEW; | ||
530 | - default: | ||
531 | - log.debug("Unsupported entity type: [{}]", entityType); | ||
532 | - return null; | ||
533 | - } | ||
534 | - } | ||
535 | } | 522 | } |
536 | 523 | ||
537 | 524 |
@@ -431,6 +431,9 @@ public final class EdgeGrpcSession implements Closeable { | @@ -431,6 +431,9 @@ public final class EdgeGrpcSession implements Closeable { | ||
431 | case CUSTOMER: | 431 | case CUSTOMER: |
432 | entityId = new CustomerId(edgeEvent.getEntityId()); | 432 | entityId = new CustomerId(edgeEvent.getEntityId()); |
433 | break; | 433 | break; |
434 | + case EDGE: | ||
435 | + entityId = new EdgeId(edgeEvent.getEntityId()); | ||
436 | + break; | ||
434 | } | 437 | } |
435 | DownlinkMsg downlinkMsg = null; | 438 | DownlinkMsg downlinkMsg = null; |
436 | if (entityId != null) { | 439 | if (entityId != null) { |
@@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.AdminSettings; | @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.AdminSettings; | ||
35 | import org.thingsboard.server.common.data.DashboardInfo; | 35 | import org.thingsboard.server.common.data.DashboardInfo; |
36 | import org.thingsboard.server.common.data.DataConstants; | 36 | import org.thingsboard.server.common.data.DataConstants; |
37 | import org.thingsboard.server.common.data.Device; | 37 | import org.thingsboard.server.common.data.Device; |
38 | +import org.thingsboard.server.common.data.EdgeUtils; | ||
38 | import org.thingsboard.server.common.data.EntityType; | 39 | import org.thingsboard.server.common.data.EntityType; |
39 | import org.thingsboard.server.common.data.EntityView; | 40 | import org.thingsboard.server.common.data.EntityView; |
40 | import org.thingsboard.server.common.data.User; | 41 | import org.thingsboard.server.common.data.User; |
@@ -146,7 +147,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | @@ -146,7 +147,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | ||
146 | 147 | ||
147 | @Override | 148 | @Override |
148 | public void sync(Edge edge) { | 149 | public void sync(Edge edge) { |
149 | - log.trace("[{}] staring sync process for edge [{}]", edge.getTenantId(), edge.getName()); | 150 | + log.trace("[{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId()); |
150 | try { | 151 | try { |
151 | syncWidgetsBundleAndWidgetTypes(edge); | 152 | syncWidgetsBundleAndWidgetTypes(edge); |
152 | syncAdminSettings(edge); | 153 | syncAdminSettings(edge); |
@@ -157,7 +158,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | @@ -157,7 +158,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | ||
157 | syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT)); | 158 | syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT)); |
158 | syncDashboards(edge, new TimePageLink(DEFAULT_LIMIT)); | 159 | syncDashboards(edge, new TimePageLink(DEFAULT_LIMIT)); |
159 | } catch (Exception e) { | 160 | } catch (Exception e) { |
160 | - log.error("Exception during sync process", e); | 161 | + log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e); |
161 | } | 162 | } |
162 | } | 163 | } |
163 | 164 | ||
@@ -461,7 +462,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | @@ -461,7 +462,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | ||
461 | EntityId entityId = EntityIdFactory.getByTypeAndUuid( | 462 | EntityId entityId = EntityIdFactory.getByTypeAndUuid( |
462 | EntityType.valueOf(attributesRequestMsg.getEntityType()), | 463 | EntityType.valueOf(attributesRequestMsg.getEntityType()), |
463 | new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); | 464 | new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB())); |
464 | - final EdgeEventType type = getEdgeQueueTypeByEntityType(entityId.getEntityType()); | 465 | + final EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(entityId.getEntityType()); |
465 | if (type != null) { | 466 | if (type != null) { |
466 | SettableFuture<Void> futureToSet = SettableFuture.create(); | 467 | SettableFuture<Void> futureToSet = SettableFuture.create(); |
467 | String scope = attributesRequestMsg.getScope(); | 468 | String scope = attributesRequestMsg.getScope(); |
@@ -520,19 +521,6 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | @@ -520,19 +521,6 @@ public class DefaultSyncEdgeService implements SyncEdgeService { | ||
520 | } | 521 | } |
521 | } | 522 | } |
522 | 523 | ||
523 | - private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) { | ||
524 | - switch (entityType) { | ||
525 | - case DEVICE: | ||
526 | - return EdgeEventType.DEVICE; | ||
527 | - case ASSET: | ||
528 | - return EdgeEventType.ASSET; | ||
529 | - case ENTITY_VIEW: | ||
530 | - return EdgeEventType.ENTITY_VIEW; | ||
531 | - default: | ||
532 | - return null; | ||
533 | - } | ||
534 | - } | ||
535 | - | ||
536 | @Override | 524 | @Override |
537 | public ListenableFuture<Void> processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) { | 525 | public ListenableFuture<Void> processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) { |
538 | log.trace("[{}] processRelationRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), relationRequestMsg); | 526 | log.trace("[{}] processRelationRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), relationRequestMsg); |
@@ -5,7 +5,7 @@ | @@ -5,7 +5,7 @@ | ||
5 | * you may not use this file except in compliance with the License. | 5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at | 6 | * You may obtain a copy of the License at |
7 | * | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * | 9 | * |
10 | * Unless required by applicable law or agreed to in writing, software | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
@@ -15,8 +15,10 @@ | @@ -15,8 +15,10 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.common.data; | 16 | package org.thingsboard.server.common.data; |
17 | 17 | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
18 | import org.thingsboard.server.common.data.edge.EdgeEventType; | 19 | import org.thingsboard.server.common.data.edge.EdgeEventType; |
19 | 20 | ||
21 | +@Slf4j | ||
20 | public final class EdgeUtils { | 22 | public final class EdgeUtils { |
21 | 23 | ||
22 | private EdgeUtils() { | 24 | private EdgeUtils() { |
@@ -49,6 +51,7 @@ public final class EdgeUtils { | @@ -49,6 +51,7 @@ public final class EdgeUtils { | ||
49 | case WIDGET_TYPE: | 51 | case WIDGET_TYPE: |
50 | return EdgeEventType.WIDGET_TYPE; | 52 | return EdgeEventType.WIDGET_TYPE; |
51 | default: | 53 | default: |
54 | + log.warn("Unsupported entity type [{}]", entityType); | ||
52 | return null; | 55 | return null; |
53 | } | 56 | } |
54 | } | 57 | } |
@@ -469,12 +469,16 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | @@ -469,12 +469,16 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic | ||
469 | @Override | 469 | @Override |
470 | public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { | 470 | public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { |
471 | log.trace("[{}] Executing findRelatedEdgeIdsByEntityId [{}]", tenantId, entityId); | 471 | log.trace("[{}] Executing findRelatedEdgeIdsByEntityId [{}]", tenantId, entityId); |
472 | - if (EntityType.TENANT.equals(entityId.getEntityType())) { | 472 | + if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) { |
473 | List<EdgeId> result = new ArrayList<>(); | 473 | List<EdgeId> result = new ArrayList<>(); |
474 | TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT); | 474 | TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT); |
475 | TextPageData<Edge> pageData; | 475 | TextPageData<Edge> pageData; |
476 | do { | 476 | do { |
477 | - pageData = findEdgesByTenantId(tenantId, pageLink); | 477 | + if (EntityType.TENANT.equals(entityId.getEntityType())) { |
478 | + pageData = findEdgesByTenantId(tenantId, pageLink); | ||
479 | + } else { | ||
480 | + pageData = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), pageLink); | ||
481 | + } | ||
478 | if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { | 482 | if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
479 | for (Edge edge : pageData.getData()) { | 483 | for (Edge edge : pageData.getData()) { |
480 | result.add(edge.getId()); | 484 | result.add(edge.getId()); |
@@ -90,6 +90,7 @@ public abstract class AbstractEntityService { | @@ -90,6 +90,7 @@ public abstract class AbstractEntityService { | ||
90 | List<EntityView> entityViews = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId).get(); | 90 | List<EntityView> entityViews = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId).get(); |
91 | if (entityViews != null && !entityViews.isEmpty()) { | 91 | if (entityViews != null && !entityViews.isEmpty()) { |
92 | EntityView entityView = entityViews.get(0); | 92 | EntityView entityView = entityViews.get(0); |
93 | + // TODO: voba - refactor this blocking operation in 3.3+ | ||
93 | Boolean relationExists = relationService.checkRelation(tenantId,edgeId, entityView.getId(), | 94 | Boolean relationExists = relationService.checkRelation(tenantId,edgeId, entityView.getId(), |
94 | EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE).get(); | 95 | EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE).get(); |
95 | if (relationExists) { | 96 | if (relationExists) { |
@@ -97,7 +98,7 @@ public abstract class AbstractEntityService { | @@ -97,7 +98,7 @@ public abstract class AbstractEntityService { | ||
97 | } | 98 | } |
98 | } | 99 | } |
99 | } catch (ExecutionException | InterruptedException e) { | 100 | } catch (ExecutionException | InterruptedException e) { |
100 | - log.error("Exception while finding entity views for entityId [{}]", entityId, e); | 101 | + log.error("[{}] Exception while finding entity views for entityId [{}]", tenantId, entityId, e); |
101 | throw new RuntimeException("Exception while finding entity views for entityId [" + entityId + "]", e); | 102 | throw new RuntimeException("Exception while finding entity views for entityId [" + entityId + "]", e); |
102 | } | 103 | } |
103 | } | 104 | } |
@@ -59,7 +59,7 @@ public class BaseEventService implements EventService { | @@ -59,7 +59,7 @@ public class BaseEventService implements EventService { | ||
59 | public Optional<Event> saveIfNotExists(Event event) { | 59 | public Optional<Event> saveIfNotExists(Event event) { |
60 | eventValidator.validate(event, Event::getTenantId); | 60 | eventValidator.validate(event, Event::getTenantId); |
61 | if (StringUtils.isEmpty(event.getUid())) { | 61 | if (StringUtils.isEmpty(event.getUid())) { |
62 | - throw new DataValidationException("Event uid should be specified!"); | 62 | + throw new DataValidationException("Event uid should be specified!."); |
63 | } | 63 | } |
64 | checkAndTruncateDebugEvent(event); | 64 | checkAndTruncateDebugEvent(event); |
65 | return eventDao.saveIfNotExists(event); | 65 | return eventDao.saveIfNotExists(event); |
@@ -79,16 +79,16 @@ public class BaseEventService implements EventService { | @@ -79,16 +79,16 @@ public class BaseEventService implements EventService { | ||
79 | @Override | 79 | @Override |
80 | public Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid) { | 80 | public Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid) { |
81 | if (tenantId == null) { | 81 | if (tenantId == null) { |
82 | - throw new DataValidationException("Tenant id should be specified!"); | 82 | + throw new DataValidationException("Tenant id should be specified!."); |
83 | } | 83 | } |
84 | if (entityId == null) { | 84 | if (entityId == null) { |
85 | - throw new DataValidationException("Entity id should be specified!"); | 85 | + throw new DataValidationException("Entity id should be specified!."); |
86 | } | 86 | } |
87 | if (StringUtils.isEmpty(eventType)) { | 87 | if (StringUtils.isEmpty(eventType)) { |
88 | - throw new DataValidationException("Event type should be specified!"); | 88 | + throw new DataValidationException("Event type should be specified!."); |
89 | } | 89 | } |
90 | if (StringUtils.isEmpty(eventUid)) { | 90 | if (StringUtils.isEmpty(eventUid)) { |
91 | - throw new DataValidationException("Event uid should be specified!"); | 91 | + throw new DataValidationException("Event uid should be specified!."); |
92 | } | 92 | } |
93 | Event event = eventDao.findEvent(tenantId.getId(), entityId, eventType, eventUid); | 93 | Event event = eventDao.findEvent(tenantId.getId(), entityId, eventType, eventUid); |
94 | return event != null ? Optional.of(event) : Optional.empty(); | 94 | return event != null ? Optional.of(event) : Optional.empty(); |
@@ -131,13 +131,13 @@ public class BaseEventService implements EventService { | @@ -131,13 +131,13 @@ public class BaseEventService implements EventService { | ||
131 | @Override | 131 | @Override |
132 | protected void validateDataImpl(TenantId tenantId, Event event) { | 132 | protected void validateDataImpl(TenantId tenantId, Event event) { |
133 | if (event.getEntityId() == null) { | 133 | if (event.getEntityId() == null) { |
134 | - throw new DataValidationException("Entity id should be specified!"); | 134 | + throw new DataValidationException("Entity id should be specified!."); |
135 | } | 135 | } |
136 | if (StringUtils.isEmpty(event.getType())) { | 136 | if (StringUtils.isEmpty(event.getType())) { |
137 | - throw new DataValidationException("Event type should be specified!"); | 137 | + throw new DataValidationException("Event type should be specified!."); |
138 | } | 138 | } |
139 | if (event.getBody() == null) { | 139 | if (event.getBody() == null) { |
140 | - throw new DataValidationException("Event body should be specified!"); | 140 | + throw new DataValidationException("Event body should be specified!."); |
141 | } | 141 | } |
142 | } | 142 | } |
143 | }; | 143 | }; |
@@ -5,7 +5,7 @@ | @@ -5,7 +5,7 @@ | ||
5 | * you may not use this file except in compliance with the License. | 5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at | 6 | * You may obtain a copy of the License at |
7 | * | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * | 9 | * |
10 | * Unless required by applicable law or agreed to in writing, software | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
@@ -57,7 +57,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; | @@ -57,7 +57,7 @@ import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; | ||
57 | name = "push to edge", | 57 | name = "push to edge", |
58 | configClazz = EmptyNodeConfiguration.class, | 58 | configClazz = EmptyNodeConfiguration.class, |
59 | nodeDescription = "Pushes messages to edge", | 59 | nodeDescription = "Pushes messages to edge", |
60 | - nodeDetails = "Pushes messages to edge, if Message Originator assigned to particular edge or is EDGE entity. This node is used only on Cloud instances to push messages from Cloud to Edge. Supports only DEVICE, ENTITY_VIEW, ASSET and EDGE Message Originator(s).", | 60 | + nodeDetails = "Pushes messages to edge, if Message Originator assigned to particular edge or is EDGE entity. This node is used only on Cloud instances to push messages from Cloud to Edge. Supports only DEVICE, ENTITY_VIEW, ASSET, ENTITY_VIEW, DASHBOARD, TENANT, CUSTOMER and EDGE Message Originator(s).", |
61 | uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"}, | 61 | uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"}, |
62 | configDirective = "tbNodeEmptyConfig", | 62 | configDirective = "tbNodeEmptyConfig", |
63 | icon = "cloud_download", | 63 | icon = "cloud_download", |
@@ -97,48 +97,75 @@ public class TbMsgPushToEdgeNode implements TbNode { | @@ -97,48 +97,75 @@ public class TbMsgPushToEdgeNode implements TbNode { | ||
97 | } | 97 | } |
98 | 98 | ||
99 | private void processMsg(TbContext ctx, TbMsg msg) { | 99 | private void processMsg(TbContext ctx, TbMsg msg) { |
100 | - ListenableFuture<List<EdgeId>> getEdgeIdsFuture = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator()); | ||
101 | - Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() { | ||
102 | - @Override | ||
103 | - public void onSuccess(@Nullable List<EdgeId> edgeIds) { | ||
104 | - if (edgeIds != null && !edgeIds.isEmpty()) { | ||
105 | - for (EdgeId edgeId : edgeIds) { | ||
106 | - try { | ||
107 | - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); | ||
108 | - if (edgeEvent == null) { | ||
109 | - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); | ||
110 | - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); | ||
111 | - } else { | ||
112 | - edgeEvent.setEdgeId(edgeId); | ||
113 | - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); | ||
114 | - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() { | ||
115 | - @Override | ||
116 | - public void onSuccess(@Nullable EdgeEvent event) { | ||
117 | - ctx.tellNext(msg, SUCCESS); | ||
118 | - ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); | ||
119 | - } | ||
120 | - | ||
121 | - @Override | ||
122 | - public void onFailure(Throwable th) { | ||
123 | - log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th); | ||
124 | - ctx.tellFailure(msg, th); | ||
125 | - } | ||
126 | - }, ctx.getDbCallbackExecutor()); | 100 | + if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) { |
101 | + try { | ||
102 | + EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); | ||
103 | + if (edgeEvent != null) { | ||
104 | + EdgeId edgeId = new EdgeId(msg.getOriginator().getId()); | ||
105 | + edgeEvent.setEdgeId(edgeId); | ||
106 | + ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); | ||
107 | + Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() { | ||
108 | + @Override | ||
109 | + public void onSuccess(@Nullable EdgeEvent event) { | ||
110 | + ctx.tellNext(msg, SUCCESS); | ||
111 | + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); | ||
112 | + } | ||
113 | + | ||
114 | + @Override | ||
115 | + public void onFailure(Throwable th) { | ||
116 | + log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th); | ||
117 | + ctx.tellFailure(msg, th); | ||
118 | + } | ||
119 | + }, ctx.getDbCallbackExecutor()); | ||
120 | + } | ||
121 | + } catch (JsonProcessingException e) { | ||
122 | + log.error("Failed to build edge event", e); | ||
123 | + ctx.tellFailure(msg, e); | ||
124 | + } | ||
125 | + } else { | ||
126 | + ListenableFuture<List<EdgeId>> getEdgeIdsFuture = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator()); | ||
127 | + Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() { | ||
128 | + @Override | ||
129 | + public void onSuccess(@Nullable List<EdgeId> edgeIds) { | ||
130 | + if (edgeIds != null && !edgeIds.isEmpty()) { | ||
131 | + for (EdgeId edgeId : edgeIds) { | ||
132 | + try { | ||
133 | + EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); | ||
134 | + if (edgeEvent == null) { | ||
135 | + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); | ||
136 | + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); | ||
137 | + } else { | ||
138 | + edgeEvent.setEdgeId(edgeId); | ||
139 | + ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); | ||
140 | + Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() { | ||
141 | + @Override | ||
142 | + public void onSuccess(@Nullable EdgeEvent event) { | ||
143 | + ctx.tellNext(msg, SUCCESS); | ||
144 | + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); | ||
145 | + } | ||
146 | + | ||
147 | + @Override | ||
148 | + public void onFailure(Throwable th) { | ||
149 | + log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th); | ||
150 | + ctx.tellFailure(msg, th); | ||
151 | + } | ||
152 | + }, ctx.getDbCallbackExecutor()); | ||
153 | + } | ||
154 | + } catch (JsonProcessingException e) { | ||
155 | + log.error("Failed to build edge event", e); | ||
156 | + ctx.tellFailure(msg, e); | ||
127 | } | 157 | } |
128 | - } catch (JsonProcessingException e) { | ||
129 | - log.error("Failed to build edge event", e); | ||
130 | - ctx.tellFailure(msg, e); | ||
131 | } | 158 | } |
132 | } | 159 | } |
133 | } | 160 | } |
134 | - } | ||
135 | 161 | ||
136 | - @Override | ||
137 | - public void onFailure(Throwable t) { | ||
138 | - ctx.tellFailure(msg, t); | ||
139 | - } | 162 | + @Override |
163 | + public void onFailure(Throwable t) { | ||
164 | + ctx.tellFailure(msg, t); | ||
165 | + } | ||
140 | 166 | ||
141 | - }, ctx.getDbCallbackExecutor()); | 167 | + }, ctx.getDbCallbackExecutor()); |
168 | + } | ||
142 | } | 169 | } |
143 | 170 | ||
144 | private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException { | 171 | private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException { |
@@ -221,6 +248,7 @@ public class TbMsgPushToEdgeNode implements TbNode { | @@ -221,6 +248,7 @@ public class TbMsgPushToEdgeNode implements TbNode { | ||
221 | case DASHBOARD: | 248 | case DASHBOARD: |
222 | case TENANT: | 249 | case TENANT: |
223 | case CUSTOMER: | 250 | case CUSTOMER: |
251 | + case EDGE: | ||
224 | return true; | 252 | return true; |
225 | default: | 253 | default: |
226 | return false; | 254 | return false; |