Commit 3ae5acbcc8eaa47a9459256b994a60d3626a48c2

Authored by Volodymyr Babak
1 parent 5c8618c3

Added lock on edge event save/read. Removed save async method

... ... @@ -94,11 +94,9 @@ import java.util.Arrays;
94 94 import java.util.Collections;
95 95 import java.util.HashMap;
96 96 import java.util.HashSet;
97   -import java.util.LinkedHashMap;
98 97 import java.util.List;
99 98 import java.util.Map;
100 99 import java.util.Objects;
101   -import java.util.Optional;
102 100 import java.util.Set;
103 101 import java.util.UUID;
104 102 import java.util.function.Consumer;
... ... @@ -703,18 +701,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
703 701 edgeEvent.setBody(body);
704 702
705 703 edgeEvent.setEdgeId(edgeId);
706   - ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
707   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
708   - @Override
709   - public void onSuccess(EdgeEvent result) {
710   - systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
711   - }
712   -
713   - @Override
714   - public void onFailure(Throwable t) {
715   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
716   - }
717   - }, systemContext.getDbCallbackExecutor());
  704 + systemContext.getEdgeEventService().save(edgeEvent);
  705 + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
718 706 }
719 707
720 708 private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
... ...
... ... @@ -16,11 +16,7 @@
16 16 package org.thingsboard.server.service.edge;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19   -import com.google.common.util.concurrent.FutureCallback;
20   -import com.google.common.util.concurrent.Futures;
21   -import com.google.common.util.concurrent.ListenableFuture;
22 19 import lombok.extern.slf4j.Slf4j;
23   -import org.checkerframework.checker.nullness.qual.Nullable;
24 20 import org.springframework.beans.factory.annotation.Autowired;
25 21 import org.springframework.stereotype.Service;
26 22 import org.thingsboard.server.common.data.edge.Edge;
... ... @@ -123,19 +119,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
123 119 edgeEvent.setEntityId(entityId.getId());
124 120 }
125 121 edgeEvent.setBody(body);
126   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
127   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
128   - @Override
129   - public void onSuccess(@Nullable EdgeEvent result) {
130   - clusterService.onEdgeEventUpdate(tenantId, edgeId);
131   - }
132   -
133   - @Override
134   - public void onFailure(Throwable t) {
135   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
136   - }
137   - }, dbCallbackExecutorService);
138   -
  122 + edgeEventService.save(edgeEvent);
  123 + clusterService.onEdgeEventUpdate(tenantId, edgeId);
139 124 }
140 125
141 126 @Override
... ...
... ... @@ -259,7 +259,7 @@ public final class EdgeGrpcSession implements Closeable {
259 259 log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg());
260 260 }
261 261 if (sessionState.getPendingMsgsMap().isEmpty()) {
262   - log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg);
  262 + log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
263 263 if (sessionState.getScheduledSendDownlinkTask() != null) {
264 264 sessionState.getScheduledSendDownlinkTask().cancel(false);
265 265 }
... ...
... ... @@ -17,11 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
20   -import com.google.common.util.concurrent.FutureCallback;
21   -import com.google.common.util.concurrent.Futures;
22   -import com.google.common.util.concurrent.ListenableFuture;
23 20 import lombok.extern.slf4j.Slf4j;
24   -import org.checkerframework.checker.nullness.qual.Nullable;
25 21 import org.springframework.beans.factory.annotation.Autowired;
26 22 import org.thingsboard.server.common.data.HasCustomerId;
27 23 import org.thingsboard.server.common.data.edge.Edge;
... ... @@ -178,7 +174,7 @@ public abstract class BaseEdgeProcessor {
178 174 @Autowired
179 175 protected DbCallbackExecutorService dbCallbackExecutorService;
180 176
181   - protected ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
  177 + protected void saveEdgeEvent(TenantId tenantId,
182 178 EdgeId edgeId,
183 179 EdgeEventType type,
184 180 EdgeEventActionType action,
... ... @@ -197,19 +193,8 @@ public abstract class BaseEdgeProcessor {
197 193 edgeEvent.setEntityId(entityId.getId());
198 194 }
199 195 edgeEvent.setBody(body);
200   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
201   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
202   - @Override
203   - public void onSuccess(@Nullable EdgeEvent result) {
204   - tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
205   - }
206   -
207   - @Override
208   - public void onFailure(Throwable t) {
209   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
210   - }
211   - }, dbCallbackExecutorService);
212   - return future;
  196 + edgeEventService.save(edgeEvent);
  197 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
213 198 }
214 199
215 200 protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) {
... ...
... ... @@ -105,19 +105,8 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
105 105 Device newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName);
106 106 ObjectNode body = mapper.createObjectNode();
107 107 body.put("conflictName", deviceName);
108   - ListenableFuture<EdgeEvent> future =
109   - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
110   - Futures.addCallback(future, new FutureCallback<>() {
111   - @Override
112   - public void onSuccess(EdgeEvent edgeEvent) {
113   - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
114   - }
115   -
116   - @Override
117   - public void onFailure(Throwable t) {
118   - log.error("[{}] Failed to save ENTITY_MERGE_REQUEST edge event [{}][{}]", tenantId, deviceUpdateMsg, edge.getId(), t);
119   - }
120   - }, dbCallbackExecutorService);
  108 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
  109 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
121 110 }
122 111 } while (pageData != null && pageData.hasNext());
123 112 } else {
... ...
... ... @@ -122,26 +122,13 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
122 122 @Override
123 123 public ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
124 124 log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg);
125   - SettableFuture<Void> futureToSet = SettableFuture.create();
126 125 if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
127 126 RuleChainId ruleChainId =
128 127 new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
129   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(),
  128 + saveEdgeEvent(tenantId, edge.getId(),
130 129 EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
131   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
132   - @Override
133   - public void onSuccess(@Nullable EdgeEvent result) {
134   - futureToSet.set(null);
135   - }
136   -
137   - @Override
138   - public void onFailure(Throwable t) {
139   - log.error("Can't save edge event [{}]", ruleChainMetadataRequestMsg, t);
140   - futureToSet.setException(t);
141   - }
142   - }, dbCallbackExecutorService);
143 130 }
144   - return futureToSet;
  131 + return Futures.immediateFuture(null);
145 132 }
146 133
147 134 @Override
... ... @@ -273,82 +260,39 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
273 260 @Override
274 261 public ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) {
275 262 log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg);
276   - SettableFuture<Void> futureToSet = SettableFuture.create();
277 263 if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
278 264 DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
279   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
  265 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
280 266 EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
281   - Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
282   - @Override
283   - public void onSuccess(@Nullable EdgeEvent result) {
284   - futureToSet.set(null);
285   - }
286   -
287   - @Override
288   - public void onFailure(Throwable t) {
289   - log.error("Can't save edge event [{}]", deviceCredentialsRequestMsg, t);
290   - futureToSet.setException(t);
291   - }
292   - }, dbCallbackExecutorService);
293 267 }
294   - return futureToSet;
  268 + return Futures.immediateFuture(null);
295 269 }
296 270
297 271 @Override
298 272 public ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) {
299 273 log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg);
300   - SettableFuture<Void> futureToSet = SettableFuture.create();
301 274 if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
302 275 UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
303   - ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
  276 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
304 277 EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
305   - Futures.addCallback(future, new FutureCallback<>() {
306   - @Override
307   - public void onSuccess(@Nullable EdgeEvent result) {
308   - futureToSet.set(null);
309   - }
310   -
311   - @Override
312   - public void onFailure(Throwable t) {
313   - log.error("Can't save edge event [{}]", userCredentialsRequestMsg, t);
314   - futureToSet.setException(t);
315   - }
316   - }, dbCallbackExecutorService);
317 278 }
318   - return futureToSet;
  279 + return Futures.immediateFuture(null);
319 280 }
320 281
321 282 @Override
322 283 public ListenableFuture<Void> processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) {
323 284 log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg);
324   - SettableFuture<Void> futureToSet = SettableFuture.create();
325 285 if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) {
326 286 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB()));
327 287 DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
328   - List<ListenableFuture<EdgeEvent>> futures;
329 288 if (deviceProfileById != null) {
330   - futures = syncDevices(tenantId, edge, deviceProfileById.getName());
331   - } else {
332   - futures = new ArrayList<>();
  289 + syncDevices(tenantId, edge, deviceProfileById.getName());
333 290 }
334   - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
335   - @Override
336   - public void onSuccess(@Nullable List<EdgeEvent> result) {
337   - futureToSet.set(null);
338   - }
339   -
340   - @Override
341   - public void onFailure(Throwable t) {
342   - log.error("Can't sync devices by device profile [{}]", deviceProfileDevicesRequestMsg, t);
343   - futureToSet.setException(t);
344   - }
345   - }, dbCallbackExecutorService);
346 291 }
347   - return futureToSet;
  292 + return Futures.immediateFuture(null);
348 293 }
349 294
350   - private List<ListenableFuture<EdgeEvent>> syncDevices(TenantId tenantId, Edge edge, String deviceType) {
351   - List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
  295 + private void syncDevices(TenantId tenantId, Edge edge, String deviceType) {
352 296 log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType);
353 297 try {
354 298 PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
... ... @@ -358,7 +302,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
358 302 if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
359 303 log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
360 304 for (Device device : pageData.getData()) {
361   - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null));
  305 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
362 306 }
363 307 if (pageData.hasNext()) {
364 308 pageLink = pageLink.nextPageLink();
... ... @@ -368,40 +312,25 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
368 312 } catch (Exception e) {
369 313 log.error("Exception during loading edge device(s) on sync!", e);
370 314 }
371   - return futures;
372 315 }
373 316
374 317 @Override
375 318 public ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge,
376 319 WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) {
377 320 log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg);
378   - SettableFuture<Void> futureToSet = SettableFuture.create();
379 321 if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) {
380 322 WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB()));
381 323 WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId);
382   - List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
383 324 if (widgetsBundleById != null) {
384 325 List<WidgetType> widgetTypesToPush =
385 326 widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias());
386 327
387 328 for (WidgetType widgetType : widgetTypesToPush) {
388   - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null));
  329 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
389 330 }
390 331 }
391   - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
392   - @Override
393   - public void onSuccess(@Nullable List<EdgeEvent> result) {
394   - futureToSet.set(null);
395   - }
396   -
397   - @Override
398   - public void onFailure(Throwable t) {
399   - log.error("Can't sync widget types by widget bundle [{}]", widgetBundleTypesRequestMsg, t);
400   - futureToSet.setException(t);
401   - }
402   - }, dbCallbackExecutorService);
403 332 }
404   - return futureToSet;
  333 + return Futures.immediateFuture(null);
405 334 }
406 335
407 336 @Override
... ... @@ -425,6 +354,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
425 354 saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW,
426 355 EdgeEventActionType.ADDED, entityView.getId(), null);
427 356 }
  357 + futureToSet.set(null);
428 358 }
429 359
430 360 @Override
... ... @@ -434,8 +364,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
434 364 }
435 365 }, dbCallbackExecutorService);
436 366 }
  367 + } else {
  368 + futureToSet.set(null);
437 369 }
438   - futureToSet.set(null);
439 370 } catch (Exception e) {
440 371 log.error("Exception during loading relation(s) to edge on sync!", e);
441 372 futureToSet.setException(e);
... ... @@ -451,7 +382,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
451 382 return futureToSet;
452 383 }
453 384
454   - private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
  385 + private void saveEdgeEvent(TenantId tenantId,
455 386 EdgeId edgeId,
456 387 EdgeEventType type,
457 388 EdgeEventActionType action,
... ... @@ -462,19 +393,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
462 393
463 394 EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
464 395
465   - ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
466   - Futures.addCallback(future, new FutureCallback<>() {
467   - @Override
468   - public void onSuccess(@Nullable EdgeEvent result) {
469   - tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
470   - }
471   -
472   - @Override
473   - public void onFailure(Throwable t) {
474   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
475   - }
476   - }, dbCallbackExecutorService);
477   - return future;
  396 + edgeEventService.save(edgeEvent);
  397 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
478 398 }
479 399
480 400 }
... ...
... ... @@ -939,7 +939,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
939 939 String timeseriesData = "{\"data\":{\"temperature\":25},\"ts\":" + System.currentTimeMillis() + "}";
940 940 JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
941 941 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
942   - edgeEventService.saveAsync(edgeEvent);
  942 + edgeEventService.save(edgeEvent);
943 943 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
944 944 Assert.assertTrue(edgeImitator.waitForMessages());
945 945
... ... @@ -978,7 +978,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
978 978 JsonNode attributesEntityData = mapper.readTree(attributesData);
979 979 EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData);
980 980 edgeImitator.expectMessageAmount(1);
981   - edgeEventService.saveAsync(edgeEvent1);
  981 + edgeEventService.save(edgeEvent1);
982 982 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
983 983 Assert.assertTrue(edgeImitator.waitForMessages());
984 984
... ... @@ -1003,7 +1003,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1003 1003 JsonNode postAttributesEntityData = mapper.readTree(postAttributesData);
1004 1004 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData);
1005 1005 edgeImitator.expectMessageAmount(1);
1006   - edgeEventService.saveAsync(edgeEvent);
  1006 + edgeEventService.save(edgeEvent);
1007 1007 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1008 1008 Assert.assertTrue(edgeImitator.waitForMessages());
1009 1009
... ... @@ -1028,7 +1028,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1028 1028 JsonNode deleteAttributesEntityData = mapper.readTree(deleteAttributesData);
1029 1029 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData);
1030 1030 edgeImitator.expectMessageAmount(1);
1031   - edgeEventService.saveAsync(edgeEvent);
  1031 + edgeEventService.save(edgeEvent);
1032 1032 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1033 1033 Assert.assertTrue(edgeImitator.waitForMessages());
1034 1034
... ... @@ -1062,7 +1062,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1062 1062
1063 1063 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body);
1064 1064 edgeImitator.expectMessageAmount(1);
1065   - edgeEventService.saveAsync(edgeEvent);
  1065 + edgeEventService.save(edgeEvent);
1066 1066 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1067 1067 Assert.assertTrue(edgeImitator.waitForMessages());
1068 1068
... ... @@ -1088,7 +1088,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1088 1088 JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
1089 1089 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED,
1090 1090 device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
1091   - edgeEventService.saveAsync(edgeEvent);
  1091 + edgeEventService.save(edgeEvent);
1092 1092 clusterService.onEdgeEventUpdate(tenantId, edge.getId());
1093 1093 }
1094 1094
... ...
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
18   -import com.google.common.util.concurrent.ListenableFuture;
19 18 import org.thingsboard.server.common.data.edge.EdgeEvent;
20 19 import org.thingsboard.server.common.data.id.EdgeId;
21 20 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -24,7 +23,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
24 23
25 24 public interface EdgeEventService {
26 25
27   - ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent);
  26 + EdgeEvent save(EdgeEvent edgeEvent);
28 27
29 28 PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
30 29
... ...
... ... @@ -15,9 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.dao.edge;
17 17
18   -import com.google.common.util.concurrent.ListenableFuture;
19 18 import lombok.extern.slf4j.Slf4j;
20   -import org.apache.commons.lang3.StringUtils;
21 19 import org.springframework.beans.factory.annotation.Autowired;
22 20 import org.springframework.stereotype.Service;
23 21 import org.thingsboard.server.common.data.edge.EdgeEvent;
... ... @@ -36,9 +34,9 @@ public class BaseEdgeEventService implements EdgeEventService {
36 34 private EdgeEventDao edgeEventDao;
37 35
38 36 @Override
39   - public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) {
  37 + public EdgeEvent save(EdgeEvent edgeEvent) {
40 38 edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId);
41   - return edgeEventDao.saveAsync(edgeEvent);
  39 + return edgeEventDao.save(edgeEvent);
42 40 }
43 41
44 42 @Override
... ...
... ... @@ -30,12 +30,12 @@ import java.util.UUID;
30 30 public interface EdgeEventDao extends Dao<EdgeEvent> {
31 31
32 32 /**
33   - * Save or update edge event object async
  33 + * Save or update edge event object
34 34 *
35 35 * @param edgeEvent the event object
36 36 * @return saved edge event object future
37 37 */
38   - ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent);
  38 + EdgeEvent save(EdgeEvent edgeEvent);
39 39
40 40
41 41 /**
... ...
... ... @@ -41,6 +41,10 @@ import java.sql.ResultSet;
41 41 import java.sql.SQLException;
42 42 import java.util.Optional;
43 43 import java.util.UUID;
  44 +import java.util.concurrent.ConcurrentHashMap;
  45 +import java.util.concurrent.ConcurrentMap;
  46 +import java.util.concurrent.locks.Lock;
  47 +import java.util.concurrent.locks.ReentrantLock;
44 48
45 49 import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
46 50
... ... @@ -50,6 +54,8 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
50 54
51 55 private final UUID systemTenantId = NULL_UUID;
52 56
  57 + private final ConcurrentMap<EdgeId, Lock> readWriteLocks = new ConcurrentHashMap<>();
  58 +
53 59 @Autowired
54 60 private EdgeEventRepository edgeEventRepository;
55 61
... ... @@ -64,47 +70,59 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
64 70 }
65 71
66 72 @Override
67   - public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) {
68   - log.debug("Save edge event [{}] ", edgeEvent);
69   - if (edgeEvent.getId() == null) {
70   - UUID timeBased = Uuids.timeBased();
71   - edgeEvent.setId(new EdgeEventId(timeBased));
72   - edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased));
73   - } else if (edgeEvent.getCreatedTime() == 0L) {
74   - UUID eventId = edgeEvent.getId().getId();
75   - if (eventId.version() == 1) {
76   - edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
77   - } else {
78   - edgeEvent.setCreatedTime(System.currentTimeMillis());
  73 + public EdgeEvent save(EdgeEvent edgeEvent) {
  74 + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeEvent.getEdgeId(), id -> new ReentrantLock());
  75 + readWriteLock.lock();
  76 + try {
  77 + log.debug("Save edge event [{}] ", edgeEvent);
  78 + if (edgeEvent.getId() == null) {
  79 + UUID timeBased = Uuids.timeBased();
  80 + edgeEvent.setId(new EdgeEventId(timeBased));
  81 + edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased));
  82 + } else if (edgeEvent.getCreatedTime() == 0L) {
  83 + UUID eventId = edgeEvent.getId().getId();
  84 + if (eventId.version() == 1) {
  85 + edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
  86 + } else {
  87 + edgeEvent.setCreatedTime(System.currentTimeMillis());
  88 + }
79 89 }
  90 + if (StringUtils.isEmpty(edgeEvent.getUid())) {
  91 + edgeEvent.setUid(edgeEvent.getId().toString());
  92 + }
  93 + return save(new EdgeEventEntity(edgeEvent)).orElse(null);
  94 + } finally {
  95 + readWriteLock.unlock();
80 96 }
81   - if (StringUtils.isEmpty(edgeEvent.getUid())) {
82   - edgeEvent.setUid(edgeEvent.getId().toString());
83   - }
84   - return service.submit(() -> save(new EdgeEventEntity(edgeEvent)).orElse(null));
85 97 }
86 98
87 99 @Override
88 100 public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
89   - if (withTsUpdate) {
90   - return DaoUtil.toPageData(
91   - edgeEventRepository
92   - .findEdgeEventsByTenantIdAndEdgeId(
93   - tenantId,
94   - edgeId.getId(),
95   - pageLink.getStartTime(),
96   - pageLink.getEndTime(),
97   - DaoUtil.toPageable(pageLink)));
98   - } else {
99   - return DaoUtil.toPageData(
100   - edgeEventRepository
101   - .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
102   - tenantId,
103   - edgeId.getId(),
104   - pageLink.getStartTime(),
105   - pageLink.getEndTime(),
106   - DaoUtil.toPageable(pageLink)));
  101 + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
  102 + readWriteLock.lock();
  103 + try {
  104 + if (withTsUpdate) {
  105 + return DaoUtil.toPageData(
  106 + edgeEventRepository
  107 + .findEdgeEventsByTenantIdAndEdgeId(
  108 + tenantId,
  109 + edgeId.getId(),
  110 + pageLink.getStartTime(),
  111 + pageLink.getEndTime(),
  112 + DaoUtil.toPageable(pageLink)));
  113 + } else {
  114 + return DaoUtil.toPageData(
  115 + edgeEventRepository
  116 + .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
  117 + tenantId,
  118 + edgeId.getId(),
  119 + pageLink.getStartTime(),
  120 + pageLink.getEndTime(),
  121 + DaoUtil.toPageable(pageLink)));
107 122
  123 + }
  124 + } finally {
  125 + readWriteLock.unlock();
108 126 }
109 127 }
110 128
... ...
... ... @@ -42,7 +42,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
42 42 EdgeId edgeId = new EdgeId(Uuids.timeBased());
43 43 DeviceId deviceId = new DeviceId(Uuids.timeBased());
44 44 EdgeEvent edgeEvent = generateEdgeEvent(null, edgeId, deviceId, EdgeEventActionType.ADDED);
45   - EdgeEvent saved = edgeEventService.saveAsync(edgeEvent).get();
  45 + EdgeEvent saved = edgeEventService.save(edgeEvent);
46 46 Assert.assertEquals(saved.getTenantId(), edgeEvent.getTenantId());
47 47 Assert.assertEquals(saved.getEdgeId(), edgeEvent.getEdgeId());
48 48 Assert.assertEquals(saved.getEntityId(), edgeEvent.getEntityId());
... ... @@ -109,7 +109,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
109 109 TimePageLink pageLink = new TimePageLink(1);
110 110
111 111 EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
112   - edgeEventService.saveAsync(edgeEventWithTsUpdate).get();
  112 + edgeEventService.save(edgeEventWithTsUpdate);
113 113
114 114 PageData<EdgeEvent> allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
115 115 PageData<EdgeEvent> edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false);
... ... @@ -124,6 +124,6 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
124 124 private EdgeEvent saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
125 125 EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
126 126 edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
127   - return edgeEventService.saveAsync(edgeEvent).get();
  127 + return edgeEventService.save(edgeEvent);
128 128 }
129 129 }
\ No newline at end of file
... ...
... ... @@ -149,20 +149,9 @@ public class TbMsgPushToEdgeNode implements TbNode {
149 149
150 150 private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
151 151 edgeEvent.setEdgeId(edgeId);
152   - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
153   - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
154   - @Override
155   - public void onSuccess(@Nullable EdgeEvent event) {
156   - ctx.tellNext(msg, SUCCESS);
157   - ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
158   - }
159   -
160   - @Override
161   - public void onFailure(Throwable th) {
162   - log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th);
163   - ctx.tellFailure(msg, th);
164   - }
165   - }, ctx.getDbCallbackExecutor());
  152 + ctx.getEdgeEventService().save(edgeEvent);
  153 + ctx.tellNext(msg, SUCCESS);
  154 + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
166 155 }
167 156
168 157 private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) {
... ...