Commit 3a46cbe5d831aee37f890f31d91d83a206647927

Authored by Volodymyr Babak
1 parent 57adec67

Refactored edge event process. Code review fixes

... ... @@ -441,27 +441,41 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
441 441 private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
442 442 AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
443 443 ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
444   - Futures.transform(alarmFuture, alarm -> {
445   - if (alarm != null) {
446   - EdgeEventType type = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
447   - if (type != null) {
448   - ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator());
449   - Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> {
450   - if (relatedEdgeIdsByEntityId != null) {
451   - for (EdgeId edgeId : relatedEdgeIdsByEntityId) {
452   - saveEdgeEvent(tenantId,
453   - edgeId,
454   - EdgeEventType.ALARM,
455   - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
456   - alarmId,
457   - null);
  444 + Futures.addCallback(alarmFuture, new FutureCallback<Alarm>() {
  445 + @Override
  446 + public void onSuccess(@Nullable Alarm alarm) {
  447 + if (alarm != null) {
  448 + EdgeEventType type = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
  449 + if (type != null) {
  450 + ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator());
  451 + Futures.addCallback(relatedEdgeIdsByEntityIdFuture, new FutureCallback<List<EdgeId>>() {
  452 + @Override
  453 + public void onSuccess(@Nullable List<EdgeId> relatedEdgeIdsByEntityId) {
  454 + if (relatedEdgeIdsByEntityId != null) {
  455 + for (EdgeId edgeId : relatedEdgeIdsByEntityId) {
  456 + saveEdgeEvent(tenantId,
  457 + edgeId,
  458 + EdgeEventType.ALARM,
  459 + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
  460 + alarmId,
  461 + null);
  462 + }
  463 + }
458 464 }
459   - }
460   - return null;
461   - }, dbCallbackExecutorService);
  465 +
  466 + @Override
  467 + public void onFailure(Throwable t) {
  468 + log.warn("[{}] can't find related edge ids by entity id [{}]", tenantId.getId(), alarm.getOriginator(), t);
  469 + }
  470 + }, dbCallbackExecutorService);
  471 + }
462 472 }
463 473 }
464   - return null;
  474 +
  475 + @Override
  476 + public void onFailure(Throwable t) {
  477 + log.warn("[{}] can't find alarm by id [{}]", tenantId.getId(), alarmId.getId(), t);
  478 + }
465 479 }, dbCallbackExecutorService);
466 480 }
467 481
... ... @@ -473,26 +487,34 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
473 487 futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getTo()));
474 488 futures.add(edgeService.findRelatedEdgeIdsByEntityId(tenantId, relation.getFrom()));
475 489 ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
476   - Futures.transform(combinedFuture, listOfListsEdgeIds -> {
477   - Set<EdgeId> uniqueEdgeIds = new HashSet<>();
478   - if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) {
479   - for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) {
480   - if (listOfListsEdgeId != null) {
481   - uniqueEdgeIds.addAll(listOfListsEdgeId);
  490 + Futures.addCallback(combinedFuture, new FutureCallback<List<List<EdgeId>>>() {
  491 + @Override
  492 + public void onSuccess(@Nullable List<List<EdgeId>> listOfListsEdgeIds) {
  493 + Set<EdgeId> uniqueEdgeIds = new HashSet<>();
  494 + if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) {
  495 + for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) {
  496 + if (listOfListsEdgeId != null) {
  497 + uniqueEdgeIds.addAll(listOfListsEdgeId);
  498 + }
482 499 }
483 500 }
484   - }
485   - if (!uniqueEdgeIds.isEmpty()) {
486   - for (EdgeId edgeId : uniqueEdgeIds) {
487   - saveEdgeEvent(tenantId,
488   - edgeId,
489   - EdgeEventType.RELATION,
490   - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
491   - null,
492   - mapper.valueToTree(relation));
  501 + if (!uniqueEdgeIds.isEmpty()) {
  502 + for (EdgeId edgeId : uniqueEdgeIds) {
  503 + saveEdgeEvent(tenantId,
  504 + edgeId,
  505 + EdgeEventType.RELATION,
  506 + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
  507 + null,
  508 + mapper.valueToTree(relation));
  509 + }
493 510 }
494 511 }
495   - return null;
  512 +
  513 + @Override
  514 + public void onFailure(Throwable t) {
  515 + log.warn("[{}] can't find related edge ids by relation to id [{}] and relation from id [{}]" ,
  516 + tenantId.getId(), relation.getTo().getId(), relation.getFrom().getId(), t);
  517 + }
496 518 }, dbCallbackExecutorService);
497 519 }
498 520 }
... ...
... ... @@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired;
26 26 import org.springframework.beans.factory.annotation.Value;
27 27 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
28 28 import org.springframework.stereotype.Service;
  29 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
29 30 import org.thingsboard.server.common.data.DataConstants;
30 31 import org.thingsboard.server.common.data.edge.Edge;
31 32 import org.thingsboard.server.common.data.id.EdgeId;
... ... @@ -48,10 +49,12 @@ import java.io.File;
48 49 import java.io.IOException;
49 50 import java.util.Collections;
50 51 import java.util.Map;
  52 +import java.util.UUID;
51 53 import java.util.concurrent.ConcurrentHashMap;
52 54 import java.util.concurrent.ConcurrentMap;
53   -import java.util.concurrent.ExecutorService;
54 55 import java.util.concurrent.Executors;
  56 +import java.util.concurrent.ScheduledExecutorService;
  57 +import java.util.concurrent.ScheduledFuture;
55 58 import java.util.concurrent.TimeUnit;
56 59
57 60 @Service
... ... @@ -62,6 +65,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
62 65
63 66 private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
64 67 private final ConcurrentMap<EdgeId, Boolean> sessionNewEvents = new ConcurrentHashMap<>();
  68 + private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>();
65 69 private static final ObjectMapper mapper = new ObjectMapper();
66 70
67 71 @Value("${edges.rpc.port}")
... ... @@ -77,6 +81,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
77 81 @Value("${edges.rpc.client_max_keep_alive_time_sec}")
78 82 private int clientMaxKeepAliveTimeSec;
79 83
  84 + @Value("${edges.scheduler_pool_size}")
  85 + private int schedulerPoolSize;
  86 +
80 87 @Autowired
81 88 private EdgeContextComponent ctx;
82 89
... ... @@ -85,7 +92,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
85 92
86 93 private Server server;
87 94
88   - private ExecutorService executor;
  95 + private ScheduledExecutorService scheduler;
89 96
90 97 @PostConstruct
91 98 public void init() {
... ... @@ -112,8 +119,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
112 119 throw new RuntimeException("Failed to start Edge RPC server!");
113 120 }
114 121 log.info("Edge RPC service initialized!");
115   - executor = Executors.newSingleThreadExecutor();
116   - processHandleMessages();
  122 + this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
117 123 }
118 124
119 125 @PreDestroy
... ... @@ -121,8 +127,16 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
121 127 if (server != null) {
122 128 server.shutdownNow();
123 129 }
124   - if (executor != null) {
125   - executor.shutdownNow();
  130 + for (Map.Entry<EdgeId, ScheduledFuture<?>> entry : sessionEdgeEventChecks.entrySet()) {
  131 + EdgeId edgeId = entry.getKey();
  132 + ScheduledFuture<?> sessionEdgeEventCheck = entry.getValue();
  133 + if (sessionEdgeEventCheck != null && !sessionEdgeEventCheck.isCancelled() && !sessionEdgeEventCheck.isDone()) {
  134 + sessionEdgeEventCheck.cancel(true);
  135 + sessionEdgeEventChecks.remove(edgeId);
  136 + }
  137 + }
  138 + if (scheduler != null) {
  139 + scheduler.shutdownNow();
126 140 }
127 141 }
128 142
... ... @@ -150,6 +164,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
150 164 session.close();
151 165 sessions.remove(edgeId);
152 166 sessionNewEvents.remove(edgeId);
  167 + cancelScheduleEdgeEventsCheck(edgeId);
153 168 }
154 169 }
155 170
... ... @@ -168,6 +183,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
168 183 sessionNewEvents.put(edgeId, false);
169 184 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);
170 185 save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis());
  186 + scheduleEdgeEventsCheck(edgeGrpcSession);
171 187 }
172 188
173 189 public EdgeGrpcSession getEdgeGrpcSessionById(EdgeId edgeId) {
... ... @@ -179,38 +195,38 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
179 195 }
180 196 }
181 197
182   - private void processHandleMessages() {
183   - executor.submit(() -> {
184   - while (!Thread.interrupted()) {
  198 + private void scheduleEdgeEventsCheck(EdgeGrpcSession session) {
  199 + EdgeId edgeId = session.getEdge().getId();
  200 + UUID tenantId = session.getEdge().getTenantId().getId();
  201 + if (sessions.containsKey(edgeId)) {
  202 + ScheduledFuture<?> schedule = scheduler.schedule(() -> {
185 203 try {
186   - if (sessions.size() > 0) {
187   - for (Map.Entry<EdgeId, EdgeGrpcSession> entry : sessions.entrySet()) {
188   - EdgeId edgeId = entry.getKey();
189   - EdgeGrpcSession session = entry.getValue();
190   - if (sessionNewEvents.get(edgeId)) {
191   - log.trace("[{}] set session new events flag to false", edgeId.getId());
192   - sessionNewEvents.put(edgeId, false);
193   - // TODO: voba - at the moment all edge events are processed in a single thread. Maybe this should be updated?
194   - session.processHandleMessages();
195   - }
196   - }
197   - } else {
198   - log.trace("No sessions available");
199   - }
200   - log.trace("Sleep for the next run");
201   - try {
202   - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
203   - } catch (InterruptedException ignore) {
  204 + if (sessionNewEvents.get(edgeId)) {
  205 + log.trace("[{}] Set session new events flag to false", edgeId.getId());
  206 + sessionNewEvents.put(edgeId, false);
  207 + session.processEdgeEvents();
204 208 }
205 209 } catch (Exception e) {
206   - log.warn("Failed to process messages handling!", e);
207   - try {
208   - Thread.sleep(1000);
209   - } catch (InterruptedException ignore) {
210   - }
  210 + log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), e);
211 211 }
  212 + scheduleEdgeEventsCheck(session);
  213 + }, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS);
  214 + sessionEdgeEventChecks.put(edgeId, schedule);
  215 + log.trace("[{}] Check edge event was scheduler for edge [{}]", tenantId, edgeId.getId());
  216 + } else {
  217 + log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]",
  218 + tenantId, edgeId.getId());
  219 + }
  220 + }
  221 +
  222 + private void cancelScheduleEdgeEventsCheck(EdgeId edgeId) {
  223 + if (sessionEdgeEventChecks.containsKey(edgeId)) {
  224 + ScheduledFuture<?> sessionEdgeEventCheck = sessionEdgeEventChecks.get(edgeId);
  225 + if (sessionEdgeEventCheck != null && !sessionEdgeEventCheck.isCancelled() && !sessionEdgeEventCheck.isDone()) {
  226 + sessionEdgeEventCheck.cancel(true);
  227 + sessionEdgeEventChecks.remove(edgeId);
212 228 }
213   - });
  229 + }
214 230 }
215 231
216 232 private void onEdgeDisconnect(EdgeId edgeId) {
... ... @@ -219,6 +235,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
219 235 sessionNewEvents.remove(edgeId);
220 236 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
221 237 save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis());
  238 + cancelScheduleEdgeEventsCheck(edgeId);
222 239 }
223 240
224 241 private void save(EdgeId edgeId, String key, long value) {
... ...
... ... @@ -258,7 +258,7 @@ public final class EdgeGrpcSession implements Closeable {
258 258 }
259 259 }
260 260
261   - void processHandleMessages() throws ExecutionException, InterruptedException {
  261 + void processEdgeEvents() throws ExecutionException, InterruptedException {
262 262 log.trace("[{}] processHandleMessages started", this.sessionId);
263 263 if (isConnected()) {
264 264 Long queueStartTs = getQueueStartTs().get();
... ...
... ... @@ -545,34 +545,47 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
545 545 futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.FROM));
546 546 futures.add(findRelationByQuery(edge, entityId, EntitySearchDirection.TO));
547 547 ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures);
548   - return Futures.transform(relationsListFuture, relationsList -> {
549   - try {
550   - if (relationsList != null && !relationsList.isEmpty()) {
551   - for (List<EntityRelation> entityRelations : relationsList) {
552   - log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size());
553   - for (EntityRelation relation : entityRelations) {
554   - try {
555   - if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
556   - !relation.getTo().getEntityType().equals(EntityType.EDGE)) {
557   - saveEdgeEvent(edge.getTenantId(),
558   - edge.getId(),
559   - EdgeEventType.RELATION,
560   - EdgeEventActionType.ADDED,
561   - null,
562   - mapper.valueToTree(relation));
  548 + SettableFuture<Void> futureToSet = SettableFuture.create();
  549 + Futures.addCallback(relationsListFuture, new FutureCallback<List<List<EntityRelation>>>() {
  550 + @Override
  551 + public void onSuccess(@Nullable List<List<EntityRelation>> relationsList) {
  552 + try {
  553 + if (relationsList != null && !relationsList.isEmpty()) {
  554 + for (List<EntityRelation> entityRelations : relationsList) {
  555 + log.trace("[{}] [{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), entityId, entityRelations.size());
  556 + for (EntityRelation relation : entityRelations) {
  557 + try {
  558 + if (!relation.getFrom().getEntityType().equals(EntityType.EDGE) &&
  559 + !relation.getTo().getEntityType().equals(EntityType.EDGE)) {
  560 + saveEdgeEvent(edge.getTenantId(),
  561 + edge.getId(),
  562 + EdgeEventType.RELATION,
  563 + EdgeEventActionType.ADDED,
  564 + null,
  565 + mapper.valueToTree(relation));
  566 + }
  567 + } catch (Exception e) {
  568 + log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
  569 + futureToSet.setException(e);
  570 + return;
  571 + }
  572 + }
563 573 }
564   - } catch (Exception e) {
565   - log.error("Exception during loading relation [{}] to edge on sync!", relation, e);
566 574 }
  575 + futureToSet.set(null);
  576 + } catch (Exception e) {
  577 + log.error("Exception during loading relation(s) to edge on sync!", e);
  578 + futureToSet.setException(e);
567 579 }
568 580 }
569   - }
570   - } catch (Exception e) {
571   - log.error("Exception during loading relation(s) to edge on sync!", e);
572   - throw new RuntimeException("Exception during loading relation(s) to edge on sync!", e);
573   - }
574   - return null;
575   - }, dbCallbackExecutorService);
  581 +
  582 + @Override
  583 + public void onFailure(Throwable t) {
  584 + log.error("[{}] Can't find relation by query. Entity id [{}]", edge.getTenantId(), entityId, t);
  585 + futureToSet.setException(t);
  586 + }
  587 + }, dbCallbackExecutorService);
  588 + return futureToSet;
576 589 }
577 590
578 591 private ListenableFuture<List<EntityRelation>> findRelationByQuery(Edge edge, EntityId entityId, EntitySearchDirection direction) {
... ...
... ... @@ -602,6 +602,7 @@ edges:
602 602 max_read_records_count: "${EDGES_RPC_STORAGE_MAX_READ_RECORDS_COUNT:50}"
603 603 no_read_records_sleep: "${EDGES_RPC_NO_READ_RECORDS_SLEEP:1000}"
604 604 sleep_between_batches: "${EDGES_RPC_SLEEP_BETWEEN_BATCHES:1000}"
  605 + scheduler_pool_size: "${EDGES_SCHEDULER_POOL_SIZE:4}"
605 606 edge_events_ttl: "${EDGES_EDGE_EVENTS_TTL:0}"
606 607 state:
607 608 persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}"
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.dao.edge;
17 17
18 18 import com.google.common.base.Function;
  19 +import com.google.common.util.concurrent.FutureCallback;
19 20 import com.google.common.util.concurrent.Futures;
20 21 import com.google.common.util.concurrent.ListenableFuture;
21 22 import com.google.common.util.concurrent.MoreExecutors;
... ... @@ -324,13 +325,20 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
324 325 public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) {
325 326 log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId);
326 327 ListenableFuture<List<RuleChain>> future = ruleChainService.findDefaultEdgeRuleChainsByTenantId(tenantId);
327   - Futures.transform(future, ruleChains -> {
328   - if (ruleChains != null && !ruleChains.isEmpty()) {
329   - for (RuleChain ruleChain : ruleChains) {
330   - ruleChainService.assignRuleChainToEdge(tenantId, ruleChain.getId(), edgeId);
  328 + Futures.addCallback(future, new FutureCallback<List<RuleChain>>() {
  329 + @Override
  330 + public void onSuccess(List<RuleChain> ruleChains) {
  331 + if (ruleChains != null && !ruleChains.isEmpty()) {
  332 + for (RuleChain ruleChain : ruleChains) {
  333 + ruleChainService.assignRuleChainToEdge(tenantId, ruleChain.getId(), edgeId);
  334 + }
331 335 }
332 336 }
333   - return null;
  337 +
  338 + @Override
  339 + public void onFailure(Throwable t) {
  340 + log.warn("[{}] can't find default edge rule chains [{}]", tenantId.getId(), edgeId.getId(), t);
  341 + }
334 342 }, MoreExecutors.directExecutor());
335 343 }
336 344
... ...