...
|
...
|
@@ -16,6 +16,8 @@ |
16
|
16
|
package org.thingsboard.server.service.edge.rpc.init;
|
17
|
17
|
|
18
|
18
|
import com.google.common.util.concurrent.Futures;
|
|
19
|
+import com.google.common.util.concurrent.ListenableFuture;
|
|
20
|
+import com.google.common.util.concurrent.MoreExecutors;
|
19
|
21
|
import io.grpc.stub.StreamObserver;
|
20
|
22
|
import lombok.extern.slf4j.Slf4j;
|
21
|
23
|
import org.springframework.beans.factory.annotation.Autowired;
|
...
|
...
|
@@ -24,25 +26,31 @@ import org.thingsboard.server.common.data.Dashboard; |
24
|
26
|
import org.thingsboard.server.common.data.DashboardInfo;
|
25
|
27
|
import org.thingsboard.server.common.data.Device;
|
26
|
28
|
import org.thingsboard.server.common.data.EntityView;
|
|
29
|
+import org.thingsboard.server.common.data.Tenant;
|
27
|
30
|
import org.thingsboard.server.common.data.asset.Asset;
|
28
|
31
|
import org.thingsboard.server.common.data.edge.Edge;
|
|
32
|
+import org.thingsboard.server.common.data.id.EntityId;
|
29
|
33
|
import org.thingsboard.server.common.data.id.RuleChainId;
|
30
|
|
-import org.thingsboard.server.common.data.page.TextPageData;
|
31
|
|
-import org.thingsboard.server.common.data.page.TextPageLink;
|
32
|
34
|
import org.thingsboard.server.common.data.page.TimePageData;
|
33
|
35
|
import org.thingsboard.server.common.data.page.TimePageLink;
|
|
36
|
+import org.thingsboard.server.common.data.relation.EntityRelation;
|
|
37
|
+import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
|
|
38
|
+import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
|
39
|
+import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
|
34
|
40
|
import org.thingsboard.server.common.data.rule.RuleChain;
|
35
|
41
|
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
36
|
42
|
import org.thingsboard.server.dao.asset.AssetService;
|
37
|
43
|
import org.thingsboard.server.dao.dashboard.DashboardService;
|
38
|
44
|
import org.thingsboard.server.dao.device.DeviceService;
|
39
|
45
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
|
46
|
+import org.thingsboard.server.dao.relation.RelationService;
|
40
|
47
|
import org.thingsboard.server.dao.rule.RuleChainService;
|
41
|
48
|
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
|
42
|
49
|
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
|
43
|
50
|
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
44
|
51
|
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
|
45
|
52
|
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
|
53
|
+import org.thingsboard.server.gen.edge.RelationUpdateMsg;
|
46
|
54
|
import org.thingsboard.server.gen.edge.ResponseMsg;
|
47
|
55
|
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
|
48
|
56
|
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
...
|
...
|
@@ -52,19 +60,26 @@ import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstru |
52
|
60
|
import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
|
53
|
61
|
import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
|
54
|
62
|
import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
|
|
63
|
+import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor;
|
55
|
64
|
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
|
56
|
65
|
|
|
66
|
+import java.util.ArrayList;
|
|
67
|
+import java.util.HashSet;
|
|
68
|
+import java.util.List;
|
|
69
|
+import java.util.Set;
|
57
|
70
|
import java.util.UUID;
|
58
|
|
-import java.util.concurrent.Future;
|
59
|
71
|
|
60
|
72
|
@Service
|
61
|
73
|
@Slf4j
|
62
|
|
-public class DefaultInitEdgeService implements InitEdgeService {
|
|
74
|
+public class DefaultSyncEdgeService implements SyncEdgeService {
|
63
|
75
|
|
64
|
76
|
@Autowired
|
65
|
77
|
private RuleChainService ruleChainService;
|
66
|
78
|
|
67
|
79
|
@Autowired
|
|
80
|
+ private RelationService relationService;
|
|
81
|
+
|
|
82
|
+ @Autowired
|
68
|
83
|
private DeviceService deviceService;
|
69
|
84
|
|
70
|
85
|
@Autowired
|
...
|
...
|
@@ -80,6 +95,9 @@ public class DefaultInitEdgeService implements InitEdgeService { |
80
|
95
|
private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor;
|
81
|
96
|
|
82
|
97
|
@Autowired
|
|
98
|
+ private RelationUpdateMsgConstructor relationUpdateMsgConstructor;
|
|
99
|
+
|
|
100
|
+ @Autowired
|
83
|
101
|
private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor;
|
84
|
102
|
|
85
|
103
|
@Autowired
|
...
|
...
|
@@ -92,15 +110,68 @@ public class DefaultInitEdgeService implements InitEdgeService { |
92
|
110
|
private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
|
93
|
111
|
|
94
|
112
|
@Override
|
95
|
|
- public void init(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
96
|
|
- initRuleChains(edge, outputStream);
|
97
|
|
- initDevices(edge, outputStream);
|
98
|
|
- initAssets(edge, outputStream);
|
99
|
|
- initEntityViews(edge, outputStream);
|
100
|
|
- initDashboards(edge, outputStream);
|
|
113
|
+ public void sync(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
114
|
+ Set<EntityId> pushedEntityIds = new HashSet<>();
|
|
115
|
+ syncRuleChains(edge, pushedEntityIds, outputStream);
|
|
116
|
+ syncDevices(edge, pushedEntityIds, outputStream);
|
|
117
|
+ syncAssets(edge, pushedEntityIds, outputStream);
|
|
118
|
+ syncEntityViews(edge, pushedEntityIds, outputStream);
|
|
119
|
+ syncDashboards(edge, pushedEntityIds, outputStream);
|
|
120
|
+ syncRelations(edge, pushedEntityIds, outputStream);
|
|
121
|
+ }
|
|
122
|
+
|
|
123
|
+ private void syncRelations(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
|
124
|
+ if (!pushedEntityIds.isEmpty()) {
|
|
125
|
+ List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>();
|
|
126
|
+ for (EntityId entityId : pushedEntityIds) {
|
|
127
|
+ futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM));
|
|
128
|
+ futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO));
|
|
129
|
+ }
|
|
130
|
+ ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
|
|
131
|
+ Futures.transform(relationsListFuture, relationsList -> {
|
|
132
|
+ try {
|
|
133
|
+ Set<EntityRelation> uniqueEntityRelations = new HashSet<>();
|
|
134
|
+ if (!relationsList.isEmpty()) {
|
|
135
|
+ for (List<EntityRelation> entityRelations : relationsList) {
|
|
136
|
+ if (!entityRelations.isEmpty()) {
|
|
137
|
+ uniqueEntityRelations.addAll(entityRelations);
|
|
138
|
+ }
|
|
139
|
+ }
|
|
140
|
+ }
|
|
141
|
+ if (!uniqueEntityRelations.isEmpty()) {
|
|
142
|
+ log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size());
|
|
143
|
+ for (EntityRelation relation : uniqueEntityRelations) {
|
|
144
|
+ try {
|
|
145
|
+ RelationUpdateMsg relationUpdateMsg =
|
|
146
|
+ relationUpdateMsgConstructor.constructRelationUpdatedMsg(
|
|
147
|
+ UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE,
|
|
148
|
+ relation);
|
|
149
|
+ EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
|
150
|
+ .setRelationUpdateMsg(relationUpdateMsg)
|
|
151
|
+ .build();
|
|
152
|
+ outputStream.onNext(ResponseMsg.newBuilder()
|
|
153
|
+ .setEntityUpdateMsg(entityUpdateMsg)
|
|
154
|
+ .build());
|
|
155
|
+ } catch (Exception e) {
|
|
156
|
+ log.error("Exception during loading relation [{}] to edge on init!", relation, e);
|
|
157
|
+ }
|
|
158
|
+ }
|
|
159
|
+ }
|
|
160
|
+ } catch (Exception e) {
|
|
161
|
+ log.error("Exception during loading relation(s) to edge on init!", e);
|
|
162
|
+ }
|
|
163
|
+ return null;
|
|
164
|
+ }, MoreExecutors.directExecutor());
|
|
165
|
+ }
|
|
166
|
+ }
|
|
167
|
+
|
|
168
|
+ private ListenableFuture<List<EntityRelation>> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) {
|
|
169
|
+ EntityRelationsQuery query = new EntityRelationsQuery();
|
|
170
|
+ query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false));
|
|
171
|
+ return relationService.findByQuery(edge.getTenantId(), query);
|
101
|
172
|
}
|
102
|
173
|
|
103
|
|
- private void initDevices(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
174
|
+ private void syncDevices(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
104
|
175
|
try {
|
105
|
176
|
TimePageLink pageLink = new TimePageLink(100);
|
106
|
177
|
TimePageData<Device> pageData;
|
...
|
...
|
@@ -119,6 +190,7 @@ public class DefaultInitEdgeService implements InitEdgeService { |
119
|
190
|
outputStream.onNext(ResponseMsg.newBuilder()
|
120
|
191
|
.setEntityUpdateMsg(entityUpdateMsg)
|
121
|
192
|
.build());
|
|
193
|
+ pushedEntityIds.add(device.getId());
|
122
|
194
|
}
|
123
|
195
|
}
|
124
|
196
|
if (pageData.hasNext()) {
|
...
|
...
|
@@ -126,11 +198,11 @@ public class DefaultInitEdgeService implements InitEdgeService { |
126
|
198
|
}
|
127
|
199
|
} while (pageData.hasNext());
|
128
|
200
|
} catch (Exception e) {
|
129
|
|
- log.error("Exception during loading edge device(s) on init!");
|
|
201
|
+ log.error("Exception during loading edge device(s) on init!", e);
|
130
|
202
|
}
|
131
|
203
|
}
|
132
|
204
|
|
133
|
|
- private void initAssets(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
205
|
+ private void syncAssets(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
134
|
206
|
try {
|
135
|
207
|
TimePageLink pageLink = new TimePageLink(100);
|
136
|
208
|
TimePageData<Asset> pageData;
|
...
|
...
|
@@ -149,6 +221,7 @@ public class DefaultInitEdgeService implements InitEdgeService { |
149
|
221
|
outputStream.onNext(ResponseMsg.newBuilder()
|
150
|
222
|
.setEntityUpdateMsg(entityUpdateMsg)
|
151
|
223
|
.build());
|
|
224
|
+ pushedEntityIds.add(asset.getId());
|
152
|
225
|
}
|
153
|
226
|
}
|
154
|
227
|
if (pageData.hasNext()) {
|
...
|
...
|
@@ -156,11 +229,11 @@ public class DefaultInitEdgeService implements InitEdgeService { |
156
|
229
|
}
|
157
|
230
|
} while (pageData.hasNext());
|
158
|
231
|
} catch (Exception e) {
|
159
|
|
- log.error("Exception during loading edge asset(s) on init!");
|
|
232
|
+ log.error("Exception during loading edge asset(s) on init!", e);
|
160
|
233
|
}
|
161
|
234
|
}
|
162
|
235
|
|
163
|
|
- private void initEntityViews(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
236
|
+ private void syncEntityViews(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
164
|
237
|
try {
|
165
|
238
|
TimePageLink pageLink = new TimePageLink(100);
|
166
|
239
|
TimePageData<EntityView> pageData;
|
...
|
...
|
@@ -179,6 +252,7 @@ public class DefaultInitEdgeService implements InitEdgeService { |
179
|
252
|
outputStream.onNext(ResponseMsg.newBuilder()
|
180
|
253
|
.setEntityUpdateMsg(entityUpdateMsg)
|
181
|
254
|
.build());
|
|
255
|
+ pushedEntityIds.add(entityView.getId());
|
182
|
256
|
}
|
183
|
257
|
}
|
184
|
258
|
if (pageData.hasNext()) {
|
...
|
...
|
@@ -186,11 +260,11 @@ public class DefaultInitEdgeService implements InitEdgeService { |
186
|
260
|
}
|
187
|
261
|
} while (pageData.hasNext());
|
188
|
262
|
} catch (Exception e) {
|
189
|
|
- log.error("Exception during loading edge entity view(s) on init!");
|
|
263
|
+ log.error("Exception during loading edge entity view(s) on init!", e);
|
190
|
264
|
}
|
191
|
265
|
}
|
192
|
266
|
|
193
|
|
- private void initDashboards(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
267
|
+ private void syncDashboards(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
194
|
268
|
try {
|
195
|
269
|
TimePageLink pageLink = new TimePageLink(100);
|
196
|
270
|
TimePageData<DashboardInfo> pageData;
|
...
|
...
|
@@ -210,6 +284,7 @@ public class DefaultInitEdgeService implements InitEdgeService { |
210
|
284
|
outputStream.onNext(ResponseMsg.newBuilder()
|
211
|
285
|
.setEntityUpdateMsg(entityUpdateMsg)
|
212
|
286
|
.build());
|
|
287
|
+ pushedEntityIds.add(dashboard.getId());
|
213
|
288
|
}
|
214
|
289
|
}
|
215
|
290
|
if (pageData.hasNext()) {
|
...
|
...
|
@@ -217,11 +292,11 @@ public class DefaultInitEdgeService implements InitEdgeService { |
217
|
292
|
}
|
218
|
293
|
} while (pageData.hasNext());
|
219
|
294
|
} catch (Exception e) {
|
220
|
|
- log.error("Exception during loading edge dashboard(s) on init!");
|
|
295
|
+ log.error("Exception during loading edge dashboard(s) on init!", e);
|
221
|
296
|
}
|
222
|
297
|
}
|
223
|
298
|
|
224
|
|
- private void initRuleChains(Edge edge, StreamObserver<ResponseMsg> outputStream) {
|
|
299
|
+ private void syncRuleChains(Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) {
|
225
|
300
|
try {
|
226
|
301
|
TimePageLink pageLink = new TimePageLink(100);
|
227
|
302
|
TimePageData<RuleChain> pageData;
|
...
|
...
|
@@ -241,6 +316,7 @@ public class DefaultInitEdgeService implements InitEdgeService { |
241
|
316
|
outputStream.onNext(ResponseMsg.newBuilder()
|
242
|
317
|
.setEntityUpdateMsg(entityUpdateMsg)
|
243
|
318
|
.build());
|
|
319
|
+ pushedEntityIds.add(ruleChain.getId());
|
244
|
320
|
}
|
245
|
321
|
}
|
246
|
322
|
if (pageData.hasNext()) {
|
...
|
...
|
@@ -248,12 +324,12 @@ public class DefaultInitEdgeService implements InitEdgeService { |
248
|
324
|
}
|
249
|
325
|
} while (pageData.hasNext());
|
250
|
326
|
} catch (Exception e) {
|
251
|
|
- log.error("Exception during loading edge rule chain(s) on init!");
|
|
327
|
+ log.error("Exception during loading edge rule chain(s) on init!", e);
|
252
|
328
|
}
|
253
|
329
|
}
|
254
|
330
|
|
255
|
331
|
@Override
|
256
|
|
- public void initRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) {
|
|
332
|
+ public void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) {
|
257
|
333
|
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
|
258
|
334
|
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
|
259
|
335
|
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChainId);
|
...
|
...
|
|