...
|
...
|
@@ -33,7 +33,6 @@ import org.springframework.core.io.DefaultResourceLoader; |
33
|
33
|
import org.springframework.stereotype.Service;
|
34
|
34
|
import org.thingsboard.server.common.data.AdminSettings;
|
35
|
35
|
import org.thingsboard.server.common.data.DashboardInfo;
|
36
|
|
-import org.thingsboard.server.common.data.DataConstants;
|
37
|
36
|
import org.thingsboard.server.common.data.Device;
|
38
|
37
|
import org.thingsboard.server.common.data.EdgeUtils;
|
39
|
38
|
import org.thingsboard.server.common.data.EntityType;
|
...
|
...
|
@@ -146,37 +145,37 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
146
|
145
|
private TbClusterService tbClusterService;
|
147
|
146
|
|
148
|
147
|
@Override
|
149
|
|
- public void sync(Edge edge) {
|
150
|
|
- log.trace("[{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId());
|
|
148
|
+ public void sync(TenantId tenantId, Edge edge) {
|
|
149
|
+ log.trace("[{}][{}] Staring edge sync process", tenantId, edge.getId());
|
151
|
150
|
try {
|
152
|
|
- syncWidgetsBundleAndWidgetTypes(edge);
|
153
|
|
- syncAdminSettings(edge);
|
154
|
|
- syncRuleChains(edge, new TimePageLink(DEFAULT_LIMIT));
|
155
|
|
- syncUsers(edge, new TextPageLink(DEFAULT_LIMIT));
|
156
|
|
- syncDevices(edge, new TimePageLink(DEFAULT_LIMIT));
|
157
|
|
- syncAssets(edge, new TimePageLink(DEFAULT_LIMIT));
|
158
|
|
- syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT));
|
159
|
|
- syncDashboards(edge, new TimePageLink(DEFAULT_LIMIT));
|
|
151
|
+ syncWidgetsBundleAndWidgetTypes(tenantId, edge);
|
|
152
|
+ syncAdminSettings(tenantId, edge);
|
|
153
|
+ syncRuleChains(tenantId, edge, new TimePageLink(DEFAULT_LIMIT));
|
|
154
|
+ syncUsers(tenantId, edge, new TextPageLink(DEFAULT_LIMIT));
|
|
155
|
+ syncDevices(tenantId, edge, new TimePageLink(DEFAULT_LIMIT));
|
|
156
|
+ syncAssets(tenantId, edge, new TimePageLink(DEFAULT_LIMIT));
|
|
157
|
+ syncEntityViews(tenantId, edge, new TimePageLink(DEFAULT_LIMIT));
|
|
158
|
+ syncDashboards(tenantId, edge, new TimePageLink(DEFAULT_LIMIT));
|
160
|
159
|
} catch (Exception e) {
|
161
|
|
- log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e);
|
|
160
|
+ log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), e);
|
162
|
161
|
}
|
163
|
162
|
}
|
164
|
163
|
|
165
|
|
- private void syncRuleChains(Edge edge, TimePageLink pageLink) {
|
166
|
|
- log.trace("[{}] syncRuleChains [{}] [{}]", edge.getTenantId(), edge.getName(), pageLink);
|
|
164
|
+ private void syncRuleChains(TenantId tenantId, Edge edge, TimePageLink pageLink) {
|
|
165
|
+ log.trace("[{}] syncRuleChains [{}] [{}]", tenantId, edge.getName(), pageLink);
|
167
|
166
|
try {
|
168
|
167
|
ListenableFuture<TimePageData<RuleChain>> future =
|
169
|
|
- ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
|
|
168
|
+ ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
|
170
|
169
|
Futures.addCallback(future, new FutureCallback<TimePageData<RuleChain>>() {
|
171
|
170
|
@Override
|
172
|
171
|
public void onSuccess(@Nullable TimePageData<RuleChain> pageData) {
|
173
|
172
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
174
|
173
|
log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
175
|
174
|
for (RuleChain ruleChain : pageData.getData()) {
|
176
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.ADDED, ruleChain.getId(), null);
|
|
175
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.ADDED, ruleChain.getId(), null);
|
177
|
176
|
}
|
178
|
177
|
if (pageData.hasNext()) {
|
179
|
|
- syncRuleChains(edge, pageData.getNextPageLink());
|
|
178
|
+ syncRuleChains(tenantId, edge, pageData.getNextPageLink());
|
180
|
179
|
}
|
181
|
180
|
}
|
182
|
181
|
}
|
...
|
...
|
@@ -191,21 +190,21 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
191
|
190
|
}
|
192
|
191
|
}
|
193
|
192
|
|
194
|
|
- private void syncDevices(Edge edge, TimePageLink pageLink) {
|
195
|
|
- log.trace("[{}] syncDevices [{}]", edge.getTenantId(), edge.getName());
|
|
193
|
+ private void syncDevices(TenantId tenantId, Edge edge, TimePageLink pageLink) {
|
|
194
|
+ log.trace("[{}] syncDevices [{}]", tenantId, edge.getName());
|
196
|
195
|
try {
|
197
|
196
|
ListenableFuture<TimePageData<Device>> future =
|
198
|
|
- deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
|
|
197
|
+ deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
|
199
|
198
|
Futures.addCallback(future, new FutureCallback<TimePageData<Device>>() {
|
200
|
199
|
@Override
|
201
|
200
|
public void onSuccess(@Nullable TimePageData<Device> pageData) {
|
202
|
201
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
203
|
202
|
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
204
|
203
|
for (Device device : pageData.getData()) {
|
205
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
|
|
204
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
|
206
|
205
|
}
|
207
|
206
|
if (pageData.hasNext()) {
|
208
|
|
- syncDevices(edge, pageData.getNextPageLink());
|
|
207
|
+ syncDevices(tenantId, edge, pageData.getNextPageLink());
|
209
|
208
|
}
|
210
|
209
|
}
|
211
|
210
|
}
|
...
|
...
|
@@ -220,20 +219,20 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
220
|
219
|
}
|
221
|
220
|
}
|
222
|
221
|
|
223
|
|
- private void syncAssets(Edge edge, TimePageLink pageLink) {
|
224
|
|
- log.trace("[{}] syncAssets [{}]", edge.getTenantId(), edge.getName());
|
|
222
|
+ private void syncAssets(TenantId tenantId, Edge edge, TimePageLink pageLink) {
|
|
223
|
+ log.trace("[{}] syncAssets [{}]", tenantId, edge.getName());
|
225
|
224
|
try {
|
226
|
|
- ListenableFuture<TimePageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
|
|
225
|
+ ListenableFuture<TimePageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
|
227
|
226
|
Futures.addCallback(future, new FutureCallback<TimePageData<Asset>>() {
|
228
|
227
|
@Override
|
229
|
228
|
public void onSuccess(@Nullable TimePageData<Asset> pageData) {
|
230
|
229
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
231
|
230
|
log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
232
|
231
|
for (Asset asset : pageData.getData()) {
|
233
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null);
|
|
232
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null);
|
234
|
233
|
}
|
235
|
234
|
if (pageData.hasNext()) {
|
236
|
|
- syncAssets(edge, pageData.getNextPageLink());
|
|
235
|
+ syncAssets(tenantId, edge, pageData.getNextPageLink());
|
237
|
236
|
}
|
238
|
237
|
}
|
239
|
238
|
}
|
...
|
...
|
@@ -248,20 +247,20 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
248
|
247
|
}
|
249
|
248
|
}
|
250
|
249
|
|
251
|
|
- private void syncEntityViews(Edge edge, TimePageLink pageLink) {
|
252
|
|
- log.trace("[{}] syncEntityViews [{}]", edge.getTenantId(), edge.getName());
|
|
250
|
+ private void syncEntityViews(TenantId tenantId, Edge edge, TimePageLink pageLink) {
|
|
251
|
+ log.trace("[{}] syncEntityViews [{}]", tenantId, edge.getName());
|
253
|
252
|
try {
|
254
|
|
- ListenableFuture<TimePageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
|
|
253
|
+ ListenableFuture<TimePageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
|
255
|
254
|
Futures.addCallback(future, new FutureCallback<TimePageData<EntityView>>() {
|
256
|
255
|
@Override
|
257
|
256
|
public void onSuccess(@Nullable TimePageData<EntityView> pageData) {
|
258
|
257
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
259
|
258
|
log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
260
|
259
|
for (EntityView entityView : pageData.getData()) {
|
261
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null);
|
|
260
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null);
|
262
|
261
|
}
|
263
|
262
|
if (pageData.hasNext()) {
|
264
|
|
- syncEntityViews(edge, pageData.getNextPageLink());
|
|
263
|
+ syncEntityViews(tenantId, edge, pageData.getNextPageLink());
|
265
|
264
|
}
|
266
|
265
|
}
|
267
|
266
|
}
|
...
|
...
|
@@ -276,20 +275,20 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
276
|
275
|
}
|
277
|
276
|
}
|
278
|
277
|
|
279
|
|
- private void syncDashboards(Edge edge, TimePageLink pageLink) {
|
280
|
|
- log.trace("[{}] syncDashboards [{}]", edge.getTenantId(), edge.getName());
|
|
278
|
+ private void syncDashboards(TenantId tenantId, Edge edge, TimePageLink pageLink) {
|
|
279
|
+ log.trace("[{}] syncDashboards [{}]", tenantId, edge.getName());
|
281
|
280
|
try {
|
282
|
|
- ListenableFuture<TimePageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
|
|
281
|
+ ListenableFuture<TimePageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
|
283
|
282
|
Futures.addCallback(future, new FutureCallback<TimePageData<DashboardInfo>>() {
|
284
|
283
|
@Override
|
285
|
284
|
public void onSuccess(@Nullable TimePageData<DashboardInfo> pageData) {
|
286
|
285
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
287
|
286
|
log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
288
|
287
|
for (DashboardInfo dashboardInfo : pageData.getData()) {
|
289
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DASHBOARD, EdgeEventActionType.ADDED, dashboardInfo.getId(), null);
|
|
288
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD, EdgeEventActionType.ADDED, dashboardInfo.getId(), null);
|
290
|
289
|
}
|
291
|
290
|
if (pageData.hasNext()) {
|
292
|
|
- syncDashboards(edge, pageData.getNextPageLink());
|
|
291
|
+ syncDashboards(tenantId, edge, pageData.getNextPageLink());
|
293
|
292
|
}
|
294
|
293
|
}
|
295
|
294
|
}
|
...
|
...
|
@@ -304,31 +303,31 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
304
|
303
|
}
|
305
|
304
|
}
|
306
|
305
|
|
307
|
|
- private void syncUsers(Edge edge, TextPageLink pageLink) {
|
308
|
|
- log.trace("[{}] syncUsers [{}]", edge.getTenantId(), edge.getName());
|
|
306
|
+ private void syncUsers(TenantId tenantId, Edge edge, TextPageLink pageLink) {
|
|
307
|
+ log.trace("[{}] syncUsers [{}]", tenantId, edge.getName());
|
309
|
308
|
try {
|
310
|
309
|
TextPageData<User> pageData;
|
311
|
310
|
do {
|
312
|
|
- pageData = userService.findTenantAdmins(edge.getTenantId(), pageLink);
|
313
|
|
- pushUsersToEdge(pageData, edge);
|
|
311
|
+ pageData = userService.findTenantAdmins(tenantId, pageLink);
|
|
312
|
+ pushUsersToEdge(tenantId, pageData, edge);
|
314
|
313
|
if (pageData != null && pageData.hasNext()) {
|
315
|
314
|
pageLink = pageData.getNextPageLink();
|
316
|
315
|
}
|
317
|
316
|
} while (pageData != null && pageData.hasNext());
|
318
|
|
- syncCustomerUsers(edge);
|
|
317
|
+ syncCustomerUsers(tenantId, edge);
|
319
|
318
|
} catch (Exception e) {
|
320
|
319
|
log.error("Exception during loading edge user(s) on sync!", e);
|
321
|
320
|
}
|
322
|
321
|
}
|
323
|
322
|
|
324
|
|
- private void syncCustomerUsers(Edge edge) {
|
|
323
|
+ private void syncCustomerUsers(TenantId tenantId, Edge edge) {
|
325
|
324
|
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
|
326
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
|
|
325
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
|
327
|
326
|
TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
|
328
|
327
|
TextPageData<User> pageData;
|
329
|
328
|
do {
|
330
|
|
- pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), pageLink);
|
331
|
|
- pushUsersToEdge(pageData, edge);
|
|
329
|
+ pageData = userService.findCustomerUsers(tenantId, edge.getCustomerId(), pageLink);
|
|
330
|
+ pushUsersToEdge(tenantId, pageData, edge);
|
332
|
331
|
if (pageData != null && pageData.hasNext()) {
|
333
|
332
|
pageLink = pageData.getNextPageLink();
|
334
|
333
|
}
|
...
|
...
|
@@ -336,45 +335,45 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
336
|
335
|
}
|
337
|
336
|
}
|
338
|
337
|
|
339
|
|
- private void pushUsersToEdge(TextPageData<User> pageData, Edge edge) {
|
|
338
|
+ private void pushUsersToEdge(TenantId tenantId, TextPageData<User> pageData, Edge edge) {
|
340
|
339
|
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
341
|
340
|
log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
|
342
|
341
|
for (User user : pageData.getData()) {
|
343
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
|
|
342
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
|
344
|
343
|
}
|
345
|
344
|
}
|
346
|
345
|
}
|
347
|
346
|
|
348
|
|
- private void syncWidgetsBundleAndWidgetTypes(Edge edge) {
|
349
|
|
- log.trace("[{}] syncWidgetsBundleAndWidgetTypes [{}]", edge.getTenantId(), edge.getName());
|
|
347
|
+ private void syncWidgetsBundleAndWidgetTypes(TenantId tenantId, Edge edge) {
|
|
348
|
+ log.trace("[{}] syncWidgetsBundleAndWidgetTypes [{}]", tenantId, edge.getName());
|
350
|
349
|
List<WidgetsBundle> widgetsBundlesToPush = new ArrayList<>();
|
351
|
350
|
List<WidgetType> widgetTypesToPush = new ArrayList<>();
|
352
|
|
- widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(edge.getTenantId()));
|
353
|
|
- widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(edge.getTenantId()));
|
|
351
|
+ widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(tenantId));
|
|
352
|
+ widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(tenantId));
|
354
|
353
|
try {
|
355
|
354
|
for (WidgetsBundle widgetsBundle: widgetsBundlesToPush) {
|
356
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.WIDGETS_BUNDLE, EdgeEventActionType.ADDED, widgetsBundle.getId(), null);
|
|
355
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, EdgeEventActionType.ADDED, widgetsBundle.getId(), null);
|
357
|
356
|
widgetTypesToPush.addAll(widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundle.getTenantId(), widgetsBundle.getAlias()));
|
358
|
357
|
}
|
359
|
358
|
for (WidgetType widgetType: widgetTypesToPush) {
|
360
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
|
|
359
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
|
361
|
360
|
}
|
362
|
361
|
} catch (Exception e) {
|
363
|
362
|
log.error("Exception during loading widgets bundle(s) and widget type(s) on sync!", e);
|
364
|
363
|
}
|
365
|
364
|
}
|
366
|
365
|
|
367
|
|
- private void syncAdminSettings(Edge edge) {
|
368
|
|
- log.trace("[{}] syncAdminSettings [{}]", edge.getTenantId(), edge.getName());
|
|
366
|
+ private void syncAdminSettings(TenantId tenantId, Edge edge) {
|
|
367
|
+ log.trace("[{}] syncAdminSettings [{}]", tenantId, edge.getName());
|
369
|
368
|
try {
|
370
|
369
|
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
|
371
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings));
|
|
370
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings));
|
372
|
371
|
AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
|
373
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings));
|
|
372
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings));
|
374
|
373
|
AdminSettings systemMailTemplates = loadMailTemplates();
|
375
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates));
|
|
374
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates));
|
376
|
375
|
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
|
377
|
|
- saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates));
|
|
376
|
+ saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates));
|
378
|
377
|
} catch (Exception e) {
|
379
|
378
|
log.error("Can't load admin settings", e);
|
380
|
379
|
}
|
...
|
...
|
@@ -433,13 +432,13 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
433
|
432
|
}
|
434
|
433
|
|
435
|
434
|
@Override
|
436
|
|
- public ListenableFuture<Void> processRuleChainMetadataRequestMsg(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
|
437
|
|
- log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), ruleChainMetadataRequestMsg);
|
|
435
|
+ public ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
|
|
436
|
+ log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg);
|
438
|
437
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
439
|
438
|
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
|
440
|
439
|
RuleChainId ruleChainId =
|
441
|
440
|
new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
|
442
|
|
- ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
|
|
441
|
+ ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
|
443
|
442
|
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
|
444
|
443
|
@Override
|
445
|
444
|
public void onSuccess(@Nullable EdgeEvent result) {
|
...
|
...
|
@@ -457,8 +456,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
457
|
456
|
}
|
458
|
457
|
|
459
|
458
|
@Override
|
460
|
|
- public ListenableFuture<Void> processAttributesRequestMsg(Edge edge, AttributesRequestMsg attributesRequestMsg) {
|
461
|
|
- log.trace("[{}] processAttributesRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), attributesRequestMsg);
|
|
459
|
+ public ListenableFuture<Void> processAttributesRequestMsg(TenantId tenantId, Edge edge, AttributesRequestMsg attributesRequestMsg) {
|
|
460
|
+ log.trace("[{}] processAttributesRequestMsg [{}][{}]", tenantId, edge.getName(), attributesRequestMsg);
|
462
|
461
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
|
463
|
462
|
EntityType.valueOf(attributesRequestMsg.getEntityType()),
|
464
|
463
|
new UUID(attributesRequestMsg.getEntityIdMSB(), attributesRequestMsg.getEntityIdLSB()));
|
...
|
...
|
@@ -466,7 +465,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
466
|
465
|
if (type != null) {
|
467
|
466
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
468
|
467
|
String scope = attributesRequestMsg.getScope();
|
469
|
|
- ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, scope);
|
|
468
|
+ ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(tenantId, entityId, scope);
|
470
|
469
|
Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
|
471
|
470
|
@Override
|
472
|
471
|
public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
|
...
|
...
|
@@ -489,7 +488,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
489
|
488
|
entityData.put("scope", scope);
|
490
|
489
|
JsonNode body = mapper.valueToTree(entityData);
|
491
|
490
|
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
|
492
|
|
- saveEdgeEvent(edge.getTenantId(),
|
|
491
|
+ saveEdgeEvent(tenantId,
|
493
|
492
|
edge.getId(),
|
494
|
493
|
type,
|
495
|
494
|
EdgeEventActionType.ATTRIBUTES_UPDATED,
|
...
|
...
|
@@ -500,7 +499,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
500
|
499
|
throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
|
501
|
500
|
}
|
502
|
501
|
} else {
|
503
|
|
- log.trace("[{}][{}] No attributes found for entity {} [{}]", edge.getTenantId(),
|
|
502
|
+ log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
|
504
|
503
|
edge.getName(),
|
505
|
504
|
entityId.getEntityType(),
|
506
|
505
|
entityId.getId());
|
...
|
...
|
@@ -516,21 +515,21 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
516
|
515
|
}, dbCallbackExecutorService);
|
517
|
516
|
return futureToSet;
|
518
|
517
|
} else {
|
519
|
|
- log.warn("[{}] Type doesn't supported {}", edge.getTenantId(), entityId.getEntityType());
|
|
518
|
+ log.warn("[{}] Type doesn't supported {}", tenantId, entityId.getEntityType());
|
520
|
519
|
return Futures.immediateFuture(null);
|
521
|
520
|
}
|
522
|
521
|
}
|
523
|
522
|
|
524
|
523
|
@Override
|
525
|
|
- public ListenableFuture<Void> processRelationRequestMsg(Edge edge, RelationRequestMsg relationRequestMsg) {
|
526
|
|
- log.trace("[{}] processRelationRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), relationRequestMsg);
|
|
524
|
+ public ListenableFuture<Void> processRelationRequestMsg(TenantId tenantId, Edge edge, RelationRequestMsg relationRequestMsg) {
|
|
525
|
+ log.trace("[{}] processRelationRequestMsg [{}][{}]", tenantId, edge.getName(), relationRequestMsg);
|
527
|
526
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
|
528
|
527
|
EntityType.valueOf(relationRequestMsg.getEntityType()),
|
529
|
528
|
new UUID(relationRequestMsg.getEntityIdMSB(), relationRequestMsg.getEntityIdLSB()));
|
530
|
529
|
|
531
|
530
|
List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>();
|
532
|
|
- futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.FROM));
|
533
|
|
- futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.TO));
|
|
531
|
+ futures.add(findRelationByQuery(tenantId, edge, entityId, EntitySearchDirection.FROM));
|
|
532
|
+ futures.add(findRelationByQuery(tenantId, edge, entityId, EntitySearchDirection.TO));
|
534
|
533
|
ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
|
535
|
534
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
536
|
535
|
Futures.addCallback(relationsListFuture, new FutureCallback<List<List<EntityRelation>>>() {
|
...
|
...
|
@@ -544,7 +543,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
544
|
543
|
try {
|
545
|
544
|
if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
|
546
|
545
|
!relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
547
|
|
- saveEdgeEvent(edge.getTenantId(),
|
|
546
|
+ saveEdgeEvent(tenantId,
|
548
|
547
|
edge.getId(),
|
549
|
548
|
EdgeEventType.RELATION,
|
550
|
549
|
EdgeEventActionType.ADDED,
|
...
|
...
|
@@ -568,26 +567,26 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
568
|
567
|
|
569
|
568
|
@Override
|
570
|
569
|
public void onFailure(Throwable t) {
|
571
|
|
- log.error("[{}] Can't find relation by query. Entity id [{}]", edge.getTenantId(), entityId, t);
|
|
570
|
+ log.error("[{}] Can't find relation by query. Entity id [{}]", tenantId, entityId, t);
|
572
|
571
|
futureToSet.setException(t);
|
573
|
572
|
}
|
574
|
573
|
}, dbCallbackExecutorService);
|
575
|
574
|
return futureToSet;
|
576
|
575
|
}
|
577
|
576
|
|
578
|
|
- private ListenableFuture<List<EntityRelation>> findRelationByQuery(Edge edge, EntityId entityId, EntitySearchDirection direction) {
|
|
577
|
+ private ListenableFuture<List<EntityRelation>> findRelationByQuery(TenantId tenantId, Edge edge, EntityId entityId, EntitySearchDirection direction) {
|
579
|
578
|
EntityRelationsQuery query = new EntityRelationsQuery();
|
580
|
579
|
query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false));
|
581
|
|
- return relationService.findByQuery(edge.getTenantId(), query);
|
|
580
|
+ return relationService.findByQuery(tenantId, query);
|
582
|
581
|
}
|
583
|
582
|
|
584
|
583
|
@Override
|
585
|
|
- public ListenableFuture<Void> processDeviceCredentialsRequestMsg(Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
|
586
|
|
- log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), deviceCredentialsRequestMsg);
|
|
584
|
+ public ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
|
|
585
|
+ log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg);
|
587
|
586
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
588
|
587
|
if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
|
589
|
588
|
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
|
590
|
|
- ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
|
|
589
|
+ ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
|
591
|
590
|
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
|
592
|
591
|
@Override
|
593
|
592
|
public void onSuccess(@Nullable EdgeEvent result) {
|
...
|
...
|
@@ -605,12 +604,12 @@ public class DefaultSyncEdgeService implements SyncEdgeService { |
605
|
604
|
}
|
606
|
605
|
|
607
|
606
|
@Override
|
608
|
|
- public ListenableFuture<Void> processUserCredentialsRequestMsg(Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
|
609
|
|
- log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", edge.getTenantId(), edge.getName(), userCredentialsRequestMsg);
|
|
607
|
+ public ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
|
|
608
|
+ log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg);
|
610
|
609
|
SettableFuture<Void> futureToSet = SettableFuture.create();
|
611
|
610
|
if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
|
612
|
611
|
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
|
613
|
|
- ListenableFuture<EdgeEvent> future = saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
|
|
612
|
+ ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
|
614
|
613
|
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
|
615
|
614
|
@Override
|
616
|
615
|
public void onSuccess(@Nullable EdgeEvent result) {
|
...
|
...
|
|