...
|
...
|
@@ -33,11 +33,15 @@ import org.thingsboard.server.common.data.DataConstants; |
33
|
33
|
import org.thingsboard.server.common.data.EdgeUtils;
|
34
|
34
|
import org.thingsboard.server.common.data.EntityType;
|
35
|
35
|
import org.thingsboard.server.common.data.audit.ActionType;
|
|
36
|
+import org.thingsboard.server.common.data.edge.Edge;
|
36
|
37
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
37
|
38
|
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
38
|
39
|
import org.thingsboard.server.common.data.id.EdgeId;
|
39
|
40
|
import org.thingsboard.server.common.data.id.EntityId;
|
|
41
|
+import org.thingsboard.server.common.data.id.IdBased;
|
40
|
42
|
import org.thingsboard.server.common.data.id.TenantId;
|
|
43
|
+import org.thingsboard.server.common.data.page.TextPageData;
|
|
44
|
+import org.thingsboard.server.common.data.page.TextPageLink;
|
41
|
45
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
42
|
46
|
import org.thingsboard.server.common.data.relation.EntityRelation;
|
43
|
47
|
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
...
|
...
|
@@ -46,10 +50,12 @@ import org.thingsboard.server.common.msg.TbMsg; |
46
|
50
|
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
47
|
51
|
|
48
|
52
|
import javax.annotation.Nullable;
|
|
53
|
+import java.util.ArrayList;
|
49
|
54
|
import java.util.HashMap;
|
50
|
55
|
import java.util.List;
|
51
|
56
|
import java.util.Map;
|
52
|
57
|
import java.util.UUID;
|
|
58
|
+import java.util.stream.Collectors;
|
53
|
59
|
|
54
|
60
|
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
|
55
|
61
|
|
...
|
...
|
@@ -84,34 +90,38 @@ public class TbMsgPushToEdgeNode implements TbNode { |
84
|
90
|
}
|
85
|
91
|
if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
|
86
|
92
|
if (isSupportedMsgType(msg.getType())) {
|
87
|
|
- ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
|
88
|
|
- Futures.addCallback(getEdgeIdFuture, new FutureCallback<EdgeId>() {
|
|
93
|
+ ListenableFuture<List<EdgeId>> getEdgeIdsFuture = getEdgeIdsByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
|
|
94
|
+ Futures.addCallback(getEdgeIdsFuture, new FutureCallback<List<EdgeId>>() {
|
89
|
95
|
@Override
|
90
|
|
- public void onSuccess(@Nullable EdgeId edgeId) {
|
91
|
|
- try {
|
92
|
|
- EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
93
|
|
- if (edgeEvent == null) {
|
94
|
|
- log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
|
95
|
|
- ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
|
96
|
|
- } else {
|
97
|
|
- edgeEvent.setEdgeId(edgeId);
|
98
|
|
- ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
99
|
|
- Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
|
100
|
|
- @Override
|
101
|
|
- public void onSuccess(@Nullable EdgeEvent event) {
|
102
|
|
- ctx.tellNext(msg, SUCCESS);
|
|
96
|
+ public void onSuccess(@Nullable List<EdgeId> edgeIds) {
|
|
97
|
+ if (edgeIds != null && !edgeIds.isEmpty()) {
|
|
98
|
+ for (EdgeId edgeId : edgeIds) {
|
|
99
|
+ try {
|
|
100
|
+ EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx);
|
|
101
|
+ if (edgeEvent == null) {
|
|
102
|
+ log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
|
|
103
|
+ ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
|
|
104
|
+ } else {
|
|
105
|
+ edgeEvent.setEdgeId(edgeId);
|
|
106
|
+ ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
|
|
107
|
+ Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
|
|
108
|
+ @Override
|
|
109
|
+ public void onSuccess(@Nullable EdgeEvent event) {
|
|
110
|
+ ctx.tellNext(msg, SUCCESS);
|
|
111
|
+ }
|
|
112
|
+
|
|
113
|
+ @Override
|
|
114
|
+ public void onFailure(Throwable th) {
|
|
115
|
+ log.error("Could not save edge event", th);
|
|
116
|
+ ctx.tellFailure(msg, th);
|
|
117
|
+ }
|
|
118
|
+ }, ctx.getDbCallbackExecutor());
|
103
|
119
|
}
|
104
|
|
-
|
105
|
|
- @Override
|
106
|
|
- public void onFailure(Throwable th) {
|
107
|
|
- log.error("Could not save edge event", th);
|
108
|
|
- ctx.tellFailure(msg, th);
|
109
|
|
- }
|
110
|
|
- }, ctx.getDbCallbackExecutor());
|
|
120
|
+ } catch (JsonProcessingException e) {
|
|
121
|
+ log.error("Failed to build edge event", e);
|
|
122
|
+ ctx.tellFailure(msg, e);
|
|
123
|
+ }
|
111
|
124
|
}
|
112
|
|
- } catch (JsonProcessingException e) {
|
113
|
|
- log.error("Failed to build edge event", e);
|
114
|
|
- ctx.tellFailure(msg, e);
|
115
|
125
|
}
|
116
|
126
|
}
|
117
|
127
|
|
...
|
...
|
@@ -201,6 +211,8 @@ public class TbMsgPushToEdgeNode implements TbNode { |
201
|
211
|
case ASSET:
|
202
|
212
|
case ENTITY_VIEW:
|
203
|
213
|
case DASHBOARD:
|
|
214
|
+ case TENANT:
|
|
215
|
+ case CUSTOMER:
|
204
|
216
|
return true;
|
205
|
217
|
default:
|
206
|
218
|
return false;
|
...
|
...
|
@@ -215,15 +227,20 @@ public class TbMsgPushToEdgeNode implements TbNode { |
215
|
227
|
|| DataConstants.ALARM.equals(msgType);
|
216
|
228
|
}
|
217
|
229
|
|
218
|
|
- private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {
|
219
|
|
- ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
|
220
|
|
- return Futures.transform(future, relations -> {
|
221
|
|
- if (relations != null && relations.size() > 0) {
|
222
|
|
- return new EdgeId(relations.get(0).getFrom().getId());
|
223
|
|
- } else {
|
224
|
|
- return null;
|
225
|
|
- }
|
226
|
|
- }, ctx.getDbCallbackExecutor());
|
|
230
|
+ private ListenableFuture<List<EdgeId>> getEdgeIdsByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {
|
|
231
|
+ if (EntityType.TENANT.equals(originatorId.getEntityType())) {
|
|
232
|
+ TextPageData<Edge> edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
|
|
233
|
+ return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
|
|
234
|
+ } else {
|
|
235
|
+ ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
|
|
236
|
+ return Futures.transform(future, relations -> {
|
|
237
|
+ List<EdgeId> result = new ArrayList<>();
|
|
238
|
+ if (relations != null && relations.size() > 0) {
|
|
239
|
+ result.add(new EdgeId(relations.get(0).getFrom().getId()));
|
|
240
|
+ }
|
|
241
|
+ return result;
|
|
242
|
+ }, ctx.getDbCallbackExecutor());
|
|
243
|
+ }
|
227
|
244
|
}
|
228
|
245
|
|
229
|
246
|
@Override
|
...
|
...
|
|