Commit aa0d05bb440e30715218b6a712802ece25a2630b

Authored by Igor Kulikov
Committed by GitHub
2 parents 11c25466 2ce04c37

Merge pull request #4957 from volodymyr-babak/edge-fix-concurrent-async-issue

[3.3.0] Added lock on edge event save/read. Removed save async method
... ... @@ -97,11 +97,9 @@ import java.util.Arrays;
97 97 import java.util.Collections;
98 98 import java.util.HashMap;
99 99 import java.util.HashSet;
100   -import java.util.LinkedHashMap;
101 100 import java.util.List;
102 101 import java.util.Map;
103 102 import java.util.Objects;
104   -import java.util.Optional;
105 103 import java.util.Set;
106 104 import java.util.UUID;
107 105 import java.util.function.Consumer;
... ... @@ -729,18 +727,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
729 727 edgeEvent.setBody(body);
730 728
731 729 edgeEvent.setEdgeId(edgeId);
732   - ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
733   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
734   - @Override
735   - public void onSuccess(EdgeEvent result) {
736   - systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
737   - }
738   -
739   - @Override
740   - public void onFailure(Throwable t) {
741   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
742   - }
743   - }, systemContext.getDbCallbackExecutor());
  730 + systemContext.getEdgeEventService().save(edgeEvent);
  731 + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
744 732 }
745 733
746 734 private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
... ...
... ... @@ -16,11 +16,7 @@
16 16 package org.thingsboard.server.service.edge;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19   -import com.google.common.util.concurrent.FutureCallback;
20   -import com.google.common.util.concurrent.Futures;
21   -import com.google.common.util.concurrent.ListenableFuture;
22 19 import lombok.extern.slf4j.Slf4j;
23   -import org.checkerframework.checker.nullness.qual.Nullable;
24 20 import org.springframework.beans.factory.annotation.Autowired;
25 21 import org.springframework.stereotype.Service;
26 22 import org.thingsboard.server.common.data.edge.Edge;
... ... @@ -41,7 +37,6 @@ import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor;
41 37 import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
42 38 import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor;
43 39 import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor;
44   -import org.thingsboard.server.service.executors.DbCallbackExecutorService;
45 40 import org.thingsboard.server.service.queue.TbClusterService;
46 41
47 42 import javax.annotation.PostConstruct;
... ... @@ -66,9 +61,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
66 61 private TbClusterService clusterService;
67 62
68 63 @Autowired
69   - private DbCallbackExecutorService dbCallbackExecutorService;
70   -
71   - @Autowired
72 64 private EdgeProcessor edgeProcessor;
73 65
74 66 @Autowired
... ... @@ -123,19 +115,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
123 115 edgeEvent.setEntityId(entityId.getId());
124 116 }
125 117 edgeEvent.setBody(body);
126   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
127   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
128   - @Override
129   - public void onSuccess(@Nullable EdgeEvent result) {
130   - clusterService.onEdgeEventUpdate(tenantId, edgeId);
131   - }
132   -
133   - @Override
134   - public void onFailure(Throwable t) {
135   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
136   - }
137   - }, dbCallbackExecutorService);
138   -
  118 + edgeEventService.save(edgeEvent);
  119 + clusterService.onEdgeEventUpdate(tenantId, edgeId);
139 120 }
140 121
141 122 @Override
... ...
... ... @@ -259,7 +259,7 @@ public final class EdgeGrpcSession implements Closeable {
259 259 log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg());
260 260 }
261 261 if (sessionState.getPendingMsgsMap().isEmpty()) {
262   - log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg);
  262 + log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
263 263 if (sessionState.getScheduledSendDownlinkTask() != null) {
264 264 sessionState.getScheduledSendDownlinkTask().cancel(false);
265 265 }
... ...
... ... @@ -17,11 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20   -import com.google.common.util.concurrent.FutureCallback;
21   -import com.google.common.util.concurrent.Futures;
22   -import com.google.common.util.concurrent.ListenableFuture;
23 20 import lombok.extern.slf4j.Slf4j;
24   -import org.checkerframework.checker.nullness.qual.Nullable;
25 21 import org.springframework.beans.factory.annotation.Autowired;
26 22 import org.thingsboard.server.common.data.HasCustomerId;
27 23 import org.thingsboard.server.common.data.edge.Edge;
... ... @@ -178,12 +174,12 @@ public abstract class BaseEdgeProcessor {
178 174 @Autowired
179 175 protected DbCallbackExecutorService dbCallbackExecutorService;
180 176
181   - protected ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
182   - EdgeId edgeId,
183   - EdgeEventType type,
184   - EdgeEventActionType action,
185   - EntityId entityId,
186   - JsonNode body) {
  177 + protected void saveEdgeEvent(TenantId tenantId,
  178 + EdgeId edgeId,
  179 + EdgeEventType type,
  180 + EdgeEventActionType action,
  181 + EntityId entityId,
  182 + JsonNode body) {
187 183 log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " +
188 184 "action [{}], entityId [{}], body [{}]",
189 185 tenantId, edgeId, type, action, entityId, body);
... ... @@ -197,19 +193,8 @@ public abstract class BaseEdgeProcessor {
197 193 edgeEvent.setEntityId(entityId.getId());
198 194 }
199 195 edgeEvent.setBody(body);
200   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
201   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
202   - @Override
203   - public void onSuccess(@Nullable EdgeEvent result) {
204   - tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
205   - }
206   -
207   - @Override
208   - public void onFailure(Throwable t) {
209   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
210   - }
211   - }, dbCallbackExecutorService);
212   - return future;
  196 + edgeEventService.save(edgeEvent);
  197 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
213 198 }
214 199
215 200 protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) {
... ...
... ... @@ -105,19 +105,8 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
105 105 Device newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName);
106 106 ObjectNode body = mapper.createObjectNode();
107 107 body.put("conflictName", deviceName);
108   - ListenableFuture<EdgeEvent> future =
109   - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
110   - Futures.addCallback(future, new FutureCallback<>() {
111   - @Override
112   - public void onSuccess(EdgeEvent edgeEvent) {
113   - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
114   - }
115   -
116   - @Override
117   - public void onFailure(Throwable t) {
118   - log.error("[{}] Failed to save ENTITY_MERGE_REQUEST edge event [{}][{}]", tenantId, deviceUpdateMsg, edge.getId(), t);
119   - }
120   - }, dbCallbackExecutorService);
  108 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
  109 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
121 110 }
122 111 } while (pageData != null && pageData.hasNext());
123 112 } else {
... ...
... ... @@ -122,26 +122,13 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
122 122 @Override
123 123 public ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
124 124 log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg);
125   - SettableFuture<Void> futureToSet = SettableFuture.create();
126 125 if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
127 126 RuleChainId ruleChainId =
128 127 new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
129   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(),
  128 + saveEdgeEvent(tenantId, edge.getId(),
130 129 EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
131   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
132   - @Override
133   - public void onSuccess(@Nullable EdgeEvent result) {
134   - futureToSet.set(null);
135   - }
136   -
137   - @Override
138   - public void onFailure(Throwable t) {
139   - log.error("Can't save edge event [{}]", ruleChainMetadataRequestMsg, t);
140   - futureToSet.setException(t);
141   - }
142   - }, dbCallbackExecutorService);
143 130 }
144   - return futureToSet;
  131 + return Futures.immediateFuture(null);
145 132 }
146 133
147 134 @Override
... ... @@ -154,8 +141,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
154 141 if (type != null) {
155 142 SettableFuture<Void> futureToSet = SettableFuture.create();
156 143 String scope = attributesRequestMsg.getScope();
157   - ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(tenantId, entityId, scope);
158   - Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
  144 + ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
  145 + Futures.addCallback(findAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
159 146 @Override
160 147 public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
161 148 if (ssAttributes != null && !ssAttributes.isEmpty()) {
... ... @@ -184,8 +171,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
184 171 entityId,
185 172 body);
186 173 } catch (Exception e) {
187   - log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
188   - throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
  174 + log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e);
  175 + futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e));
  176 + return;
189 177 }
190 178 } else {
191 179 log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
... ... @@ -198,7 +186,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
198 186
199 187 @Override
200 188 public void onFailure(Throwable t) {
201   - log.error("Can't save attributes [{}]", attributesRequestMsg, t);
  189 + log.error("Can't find attributes [{}]", attributesRequestMsg, t);
202 190 futureToSet.setException(t);
203 191 }
204 192 }, dbCallbackExecutorService);
... ... @@ -273,82 +261,39 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
273 261 @Override
274 262 public ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
275 263 log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg);
276   - SettableFuture<Void> futureToSet = SettableFuture.create();
277 264 if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
278 265 DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
279   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
  266 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
280 267 EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
281   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
282   - @Override
283   - public void onSuccess(@Nullable EdgeEvent result) {
284   - futureToSet.set(null);
285   - }
286   -
287   - @Override
288   - public void onFailure(Throwable t) {
289   - log.error("Can't save edge event [{}]", deviceCredentialsRequestMsg, t);
290   - futureToSet.setException(t);
291   - }
292   - }, dbCallbackExecutorService);
293 268 }
294   - return futureToSet;
  269 + return Futures.immediateFuture(null);
295 270 }
296 271
297 272 @Override
298 273 public ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
299 274 log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg);
300   - SettableFuture<Void> futureToSet = SettableFuture.create();
301 275 if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
302 276 UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
303   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
  277 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
304 278 EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
305   - Futures.addCallback(future, new FutureCallback<>() {
306   - @Override
307   - public void onSuccess(@Nullable EdgeEvent result) {
308   - futureToSet.set(null);
309   - }
310   -
311   - @Override
312   - public void onFailure(Throwable t) {
313   - log.error("Can't save edge event [{}]", userCredentialsRequestMsg, t);
314   - futureToSet.setException(t);
315   - }
316   - }, dbCallbackExecutorService);
317 279 }
318   - return futureToSet;
  280 + return Futures.immediateFuture(null);
319 281 }
320 282
321 283 @Override
322 284 public ListenableFuture<Void> processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) {
323 285 log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg);
324   - SettableFuture<Void> futureToSet = SettableFuture.create();
325 286 if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) {
326 287 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB()));
327 288 DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
328   - List<ListenableFuture<EdgeEvent>> futures;
329 289 if (deviceProfileById != null) {
330   - futures = syncDevices(tenantId, edge, deviceProfileById.getName());
331   - } else {
332   - futures = new ArrayList<>();
  290 + syncDevices(tenantId, edge, deviceProfileById.getName());
333 291 }
334   - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
335   - @Override
336   - public void onSuccess(@Nullable List<EdgeEvent> result) {
337   - futureToSet.set(null);
338   - }
339   -
340   - @Override
341   - public void onFailure(Throwable t) {
342   - log.error("Can't sync devices by device profile [{}]", deviceProfileDevicesRequestMsg, t);
343   - futureToSet.setException(t);
344   - }
345   - }, dbCallbackExecutorService);
346 292 }
347   - return futureToSet;
  293 + return Futures.immediateFuture(null);
348 294 }
349 295
350   - private List<ListenableFuture<EdgeEvent>> syncDevices(TenantId tenantId, Edge edge, String deviceType) {
351   - List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
  296 + private void syncDevices(TenantId tenantId, Edge edge, String deviceType) {
352 297 log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType);
353 298 try {
354 299 PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
... ... @@ -358,7 +303,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
358 303 if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
359 304 log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
360 305 for (Device device : pageData.getData()) {
361   - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null));
  306 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
362 307 }
363 308 if (pageData.hasNext()) {
364 309 pageLink = pageLink.nextPageLink();
... ... @@ -368,40 +313,25 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
368 313 } catch (Exception e) {
369 314 log.error("Exception during loading edge device(s) on sync!", e);
370 315 }
371   - return futures;
372 316 }
373 317
374 318 @Override
375 319 public ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge,
376 320 WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) {
377 321 log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg);
378   - SettableFuture<Void> futureToSet = SettableFuture.create();
379 322 if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) {
380 323 WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB()));
381 324 WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId);
382   - List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
383 325 if (widgetsBundleById != null) {
384 326 List<WidgetType> widgetTypesToPush =
385 327 widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias());
386 328
387 329 for (WidgetType widgetType : widgetTypesToPush) {
388   - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null));
  330 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
389 331 }
390 332 }
391   - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
392   - @Override
393   - public void onSuccess(@Nullable List<EdgeEvent> result) {
394   - futureToSet.set(null);
395   - }
396   -
397   - @Override
398   - public void onFailure(Throwable t) {
399   - log.error("Can't sync widget types by widget bundle [{}]", widgetBundleTypesRequestMsg, t);
400   - futureToSet.setException(t);
401   - }
402   - }, dbCallbackExecutorService);
403 333 }
404   - return futureToSet;
  334 + return Futures.immediateFuture(null);
405 335 }
406 336
407 337 @Override
... ... @@ -416,9 +346,12 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
416 346 public void onSuccess(@Nullable List<EntityView> entityViews) {
417 347 try {
418 348 if (entityViews != null && !entityViews.isEmpty()) {
  349 + List<ListenableFuture<Boolean>> futures = new ArrayList<>();
419 350 for (EntityView entityView : entityViews) {
420   - Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
421   - EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() {
  351 + ListenableFuture<Boolean> future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
  352 + EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
  353 + futures.add(future);
  354 + Futures.addCallback(future, new FutureCallback<>() {
422 355 @Override
423 356 public void onSuccess(@Nullable Boolean result) {
424 357 if (Boolean.TRUE.equals(result)) {
... ... @@ -426,16 +359,27 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
426 359 EdgeEventActionType.ADDED, entityView.getId(), null);
427 360 }
428 361 }
429   -
430 362 @Override
431 363 public void onFailure(Throwable t) {
432   - log.error("Exception during loading relation [{}] to edge on sync!", t, t);
433   - futureToSet.setException(t);
  364 + // Do nothing - error handles in allAsList
434 365 }
435 366 }, dbCallbackExecutorService);
436 367 }
  368 + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
  369 + @Override
  370 + public void onSuccess(@Nullable List<Boolean> result) {
  371 + futureToSet.set(null);
  372 + }
  373 +
  374 + @Override
  375 + public void onFailure(Throwable t) {
  376 + log.error("Exception during loading relation [{}] to edge on sync!", t, t);
  377 + futureToSet.setException(t);
  378 + }
  379 + }, dbCallbackExecutorService);
  380 + } else {
  381 + futureToSet.set(null);
437 382 }
438   - futureToSet.set(null);
439 383 } catch (Exception e) {
440 384 log.error("Exception during loading relation(s) to edge on sync!", e);
441 385 futureToSet.setException(e);
... ... @@ -451,30 +395,19 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
451 395 return futureToSet;
452 396 }
453 397
454   - private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
455   - EdgeId edgeId,
456   - EdgeEventType type,
457   - EdgeEventActionType action,
458   - EntityId entityId,
459   - JsonNode body) {
  398 + private void saveEdgeEvent(TenantId tenantId,
  399 + EdgeId edgeId,
  400 + EdgeEventType type,
  401 + EdgeEventActionType action,
  402 + EntityId entityId,
  403 + JsonNode body) {
460 404 log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]",
461 405 tenantId, edgeId, type, action, entityId, body);
462 406
463 407 EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
464 408
465   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
466   - Futures.addCallback(future, new FutureCallback<>() {
467   - @Override
468   - public void onSuccess(@Nullable EdgeEvent result) {
469   - tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
470   - }
471   -
472   - @Override
473   - public void onFailure(Throwable t) {
474   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
475   - }
476   - }, dbCallbackExecutorService);
477   - return future;
  409 + edgeEventService.save(edgeEvent);
  410 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
478 411 }
479 412
480 413 }
... ...
... ... @@ -939,7 +939,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
939 939 String timeseriesData = "{\"data\":{\"temperature\":25},\"ts\":" + System.currentTimeMillis() + "}";
940 940 JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
941 941 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
942   - edgeEventService.saveAsync(edgeEvent);
  942 + edgeEventService.save(edgeEvent);
943 943 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
944 944 Assert.assertTrue(edgeImitator.waitForMessages());
945 945
... ... @@ -978,7 +978,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
978 978 JsonNode attributesEntityData = mapper.readTree(attributesData);
979 979 EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData);
980 980 edgeImitator.expectMessageAmount(1);
981   - edgeEventService.saveAsync(edgeEvent1);
  981 + edgeEventService.save(edgeEvent1);
982 982 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
983 983 Assert.assertTrue(edgeImitator.waitForMessages());
984 984
... ... @@ -1003,7 +1003,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1003 1003 JsonNode postAttributesEntityData = mapper.readTree(postAttributesData);
1004 1004 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData);
1005 1005 edgeImitator.expectMessageAmount(1);
1006   - edgeEventService.saveAsync(edgeEvent);
  1006 + edgeEventService.save(edgeEvent);
1007 1007 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1008 1008 Assert.assertTrue(edgeImitator.waitForMessages());
1009 1009
... ... @@ -1028,7 +1028,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1028 1028 JsonNode deleteAttributesEntityData = mapper.readTree(deleteAttributesData);
1029 1029 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData);
1030 1030 edgeImitator.expectMessageAmount(1);
1031   - edgeEventService.saveAsync(edgeEvent);
  1031 + edgeEventService.save(edgeEvent);
1032 1032 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1033 1033 Assert.assertTrue(edgeImitator.waitForMessages());
1034 1034
... ... @@ -1062,7 +1062,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1062 1062
1063 1063 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body);
1064 1064 edgeImitator.expectMessageAmount(1);
1065   - edgeEventService.saveAsync(edgeEvent);
  1065 + edgeEventService.save(edgeEvent);
1066 1066 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1067 1067 Assert.assertTrue(edgeImitator.waitForMessages());
1068 1068
... ... @@ -1088,7 +1088,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1088 1088 JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
1089 1089 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED,
1090 1090 device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
1091   - edgeEventService.saveAsync(edgeEvent);
  1091 + edgeEventService.save(edgeEvent);
1092 1092 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1093 1093 }
1094 1094
... ...
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
18   -import com.google.common.util.concurrent.ListenableFuture;
19 18 import org.thingsboard.server.common.data.edge.EdgeEvent;
20 19 import org.thingsboard.server.common.data.id.EdgeId;
21 20 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -24,7 +23,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
24 23
25 24 public interface EdgeEventService {
26 25
27   - ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent);
  26 + EdgeEvent save(EdgeEvent edgeEvent);
28 27
29 28 PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
30 29
... ...
... ... @@ -15,9 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
18   -import com.google.common.util.concurrent.ListenableFuture;
19 18 import lombok.extern.slf4j.Slf4j;
20   -import org.apache.commons.lang3.StringUtils;
21 19 import org.springframework.beans.factory.annotation.Autowired;
22 20 import org.springframework.stereotype.Service;
23 21 import org.thingsboard.server.common.data.edge.EdgeEvent;
... ... @@ -36,9 +34,9 @@ public class BaseEdgeEventService implements EdgeEventService {
36 34 private EdgeEventDao edgeEventDao;
37 35
38 36 @Override
39   - public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) {
  37 + public EdgeEvent save(EdgeEvent edgeEvent) {
40 38 edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId);
41   - return edgeEventDao.saveAsync(edgeEvent);
  39 + return edgeEventDao.save(edgeEvent);
42 40 }
43 41
44 42 @Override
... ...
... ... @@ -30,12 +30,12 @@ import java.util.UUID;
30 30 public interface EdgeEventDao extends Dao<EdgeEvent> {
31 31
32 32 /**
33   - * Save or update edge event object async
  33 + * Save or update edge event object
34 34 *
35 35 * @param edgeEvent the event object
36 36 * @return saved edge event object future
37 37 */
38   - ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent);
  38 + EdgeEvent save(EdgeEvent edgeEvent);
39 39
40 40
41 41 /**
... ...
... ... @@ -41,6 +41,10 @@ import java.sql.ResultSet;
41 41 import java.sql.SQLException;
42 42 import java.util.Optional;
43 43 import java.util.UUID;
  44 +import java.util.concurrent.ConcurrentHashMap;
  45 +import java.util.concurrent.ConcurrentMap;
  46 +import java.util.concurrent.locks.Lock;
  47 +import java.util.concurrent.locks.ReentrantLock;
44 48
45 49 import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
46 50
... ... @@ -50,6 +54,8 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
50 54
51 55 private final UUID systemTenantId = NULL_UUID;
52 56
  57 + private final ConcurrentMap<EdgeId, Lock> readWriteLocks = new ConcurrentHashMap<>();
  58 +
53 59 @Autowired
54 60 private EdgeEventRepository edgeEventRepository;
55 61
... ... @@ -64,47 +70,59 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
64 70 }
65 71
66 72 @Override
67   - public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) {
68   - log.debug("Save edge event [{}] ", edgeEvent);
69   - if (edgeEvent.getId() == null) {
70   - UUID timeBased = Uuids.timeBased();
71   - edgeEvent.setId(new EdgeEventId(timeBased));
72   - edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased));
73   - } else if (edgeEvent.getCreatedTime() == 0L) {
74   - UUID eventId = edgeEvent.getId().getId();
75   - if (eventId.version() == 1) {
76   - edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
77   - } else {
78   - edgeEvent.setCreatedTime(System.currentTimeMillis());
  73 + public EdgeEvent save(EdgeEvent edgeEvent) {
  74 + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeEvent.getEdgeId(), id -> new ReentrantLock());
  75 + readWriteLock.lock();
  76 + try {
  77 + log.debug("Save edge event [{}] ", edgeEvent);
  78 + if (edgeEvent.getId() == null) {
  79 + UUID timeBased = Uuids.timeBased();
  80 + edgeEvent.setId(new EdgeEventId(timeBased));
  81 + edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased));
  82 + } else if (edgeEvent.getCreatedTime() == 0L) {
  83 + UUID eventId = edgeEvent.getId().getId();
  84 + if (eventId.version() == 1) {
  85 + edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
  86 + } else {
  87 + edgeEvent.setCreatedTime(System.currentTimeMillis());
  88 + }
79 89 }
  90 + if (StringUtils.isEmpty(edgeEvent.getUid())) {
  91 + edgeEvent.setUid(edgeEvent.getId().toString());
  92 + }
  93 + return save(new EdgeEventEntity(edgeEvent)).orElse(null);
  94 + } finally {
  95 + readWriteLock.unlock();
80 96 }
81   - if (StringUtils.isEmpty(edgeEvent.getUid())) {
82   - edgeEvent.setUid(edgeEvent.getId().toString());
83   - }
84   - return service.submit(() -> save(new EdgeEventEntity(edgeEvent)).orElse(null));
85 97 }
86 98
87 99 @Override
88 100 public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
89   - if (withTsUpdate) {
90   - return DaoUtil.toPageData(
91   - edgeEventRepository
92   - .findEdgeEventsByTenantIdAndEdgeId(
93   - tenantId,
94   - edgeId.getId(),
95   - pageLink.getStartTime(),
96   - pageLink.getEndTime(),
97   - DaoUtil.toPageable(pageLink)));
98   - } else {
99   - return DaoUtil.toPageData(
100   - edgeEventRepository
101   - .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
102   - tenantId,
103   - edgeId.getId(),
104   - pageLink.getStartTime(),
105   - pageLink.getEndTime(),
106   - DaoUtil.toPageable(pageLink)));
  101 + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
  102 + readWriteLock.lock();
  103 + try {
  104 + if (withTsUpdate) {
  105 + return DaoUtil.toPageData(
  106 + edgeEventRepository
  107 + .findEdgeEventsByTenantIdAndEdgeId(
  108 + tenantId,
  109 + edgeId.getId(),
  110 + pageLink.getStartTime(),
  111 + pageLink.getEndTime(),
  112 + DaoUtil.toPageable(pageLink)));
  113 + } else {
  114 + return DaoUtil.toPageData(
  115 + edgeEventRepository
  116 + .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
  117 + tenantId,
  118 + edgeId.getId(),
  119 + pageLink.getStartTime(),
  120 + pageLink.getEndTime(),
  121 + DaoUtil.toPageable(pageLink)));
107 122
  123 + }
  124 + } finally {
  125 + readWriteLock.unlock();
108 126 }
109 127 }
110 128
... ...
... ... @@ -42,7 +42,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
42 42 EdgeId edgeId = new EdgeId(Uuids.timeBased());
43 43 DeviceId deviceId = new DeviceId(Uuids.timeBased());
44 44 EdgeEvent edgeEvent = generateEdgeEvent(null, edgeId, deviceId, EdgeEventActionType.ADDED);
45   - EdgeEvent saved = edgeEventService.saveAsync(edgeEvent).get();
  45 + EdgeEvent saved = edgeEventService.save(edgeEvent);
46 46 Assert.assertEquals(saved.getTenantId(), edgeEvent.getTenantId());
47 47 Assert.assertEquals(saved.getEdgeId(), edgeEvent.getEdgeId());
48 48 Assert.assertEquals(saved.getEntityId(), edgeEvent.getEntityId());
... ... @@ -109,7 +109,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
109 109 TimePageLink pageLink = new TimePageLink(1);
110 110
111 111 EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
112   - edgeEventService.saveAsync(edgeEventWithTsUpdate).get();
  112 + edgeEventService.save(edgeEventWithTsUpdate);
113 113
114 114 PageData<EdgeEvent> allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
115 115 PageData<EdgeEvent> edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false);
... ... @@ -124,6 +124,6 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
124 124 private EdgeEvent saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
125 125 EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
126 126 edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
127   - return edgeEventService.saveAsync(edgeEvent).get();
  127 + return edgeEventService.save(edgeEvent);
128 128 }
129 129 }
\ No newline at end of file
... ...
... ... @@ -149,20 +149,9 @@ public class TbMsgPushToEdgeNode implements TbNode {
149 149
150 150 private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
151 151 edgeEvent.setEdgeId(edgeId);
152   - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
153   - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
154   - @Override
155   - public void onSuccess(@Nullable EdgeEvent event) {
156   - ctx.tellNext(msg, SUCCESS);
157   - ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
158   - }
159   -
160   - @Override
161   - public void onFailure(Throwable th) {
162   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th);
163   - ctx.tellFailure(msg, th);
164   - }
165   - }, ctx.getDbCallbackExecutor());
  152 + ctx.getEdgeEventService().save(edgeEvent);
  153 + ctx.tellNext(msg, SUCCESS);
  154 + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
166 155 }
167 156
168 157 private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) {
... ...