Commit 193cccf1232d9d3117f98cb03bde603c396aa3ed

Authored by Volodymyr Babak
1 parent 36f6ad2a

Refactored Edge session message poll mechanism. Removed tech dept - Integer.MAX_…

…INTEGER. Code refactoring
Showing 25 changed files with 448 additions and 116 deletions
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
34 34 import org.thingsboard.server.common.msg.MsgType;
35 35 import org.thingsboard.server.common.msg.TbActorMsg;
36 36 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  37 +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
37 38 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
38 39 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
39 40 import org.thingsboard.server.common.msg.queue.RuleEngineException;
... ... @@ -93,6 +94,9 @@ public class AppActor extends ContextAwareActor {
93 94 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
94 95 onToDeviceActorMsg((TenantAwareMsg) msg, true);
95 96 break;
  97 + case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
  98 + onToTenantActorMsg((EdgeEventUpdateMsg) msg);
  99 + break;
96 100 default:
97 101 return false;
98 102 }
... ... @@ -186,6 +190,20 @@ public class AppActor extends ContextAwareActor {
186 190 () -> new TenantActor.ActorCreator(systemContext, tenantId));
187 191 }
188 192
  193 + private void onToTenantActorMsg(EdgeEventUpdateMsg msg) {
  194 + TbActorRef target = null;
  195 + if (SYSTEM_TENANT.equals(msg.getTenantId())) {
  196 + log.warn("Message has system tenant id: {}", msg);
  197 + } else {
  198 + target = getOrCreateTenantActor(msg.getTenantId());
  199 + }
  200 + if (target != null) {
  201 + target.tellWithHighPriority(msg);
  202 + } else {
  203 + log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg);
  204 + }
  205 + }
  206 +
189 207 public static class ActorCreator extends ContextBasedCreator {
190 208
191 209 public ActorCreator(ActorSystemContext context) {
... ...
... ... @@ -145,8 +145,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
145 145 if (result != null && result.size() > 0) {
146 146 EntityRelation relationToEdge = result.get(0);
147 147 if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
  148 + log.trace("[{}][{}] found edge [{}] for device", tenantId, deviceId, relationToEdge.getFrom().getId());
148 149 return new EdgeId(relationToEdge.getFrom().getId());
  150 + } else {
  151 + log.trace("[{}][{}] edge relation is empty {}", tenantId, deviceId, relationToEdge);
149 152 }
  153 + } else {
  154 + log.trace("[{}][{}] device doesn't have any related edge", tenantId, deviceId);
150 155 }
151 156 return null;
152 157 }
... ... @@ -165,6 +170,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
165 170
166 171 boolean sent;
167 172 if (systemContext.isEdgesRpcEnabled() && edgeId != null) {
  173 + log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
168 174 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
169 175 sent = true;
170 176 } else {
... ... @@ -516,6 +522,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
516 522 }
517 523
518 524 void processEdgeUpdate(DeviceEdgeUpdateMsg msg) {
  525 + log.trace("[{}] Processing edge update {}", deviceId, msg);
519 526 this.edgeId = msg.getEdgeId();
520 527 }
521 528
... ... @@ -568,7 +575,18 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
568 575 edgeEvent.setBody(body);
569 576
570 577 edgeEvent.setEdgeId(edgeId);
571   - systemContext.getEdgeEventService().saveAsync(edgeEvent);
  578 + ListenableFuture<EdgeEvent> future = systemContext.getEdgeEventService().saveAsync(edgeEvent);
  579 + Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
  580 + @Override
  581 + public void onSuccess( EdgeEvent result) {
  582 + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
  583 + }
  584 +
  585 + @Override
  586 + public void onFailure(Throwable t) {
  587 + log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
  588 + }
  589 + }, systemContext.getDbCallbackExecutor());
572 590 }
573 591
574 592 private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
... ...
... ... @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.DataConstants;
36 36 import org.thingsboard.server.common.data.Device;
37 37 import org.thingsboard.server.common.data.alarm.Alarm;
38 38 import org.thingsboard.server.common.data.asset.Asset;
  39 +import org.thingsboard.server.common.data.id.EdgeId;
39 40 import org.thingsboard.server.common.data.id.EntityId;
40 41 import org.thingsboard.server.common.data.id.RuleChainId;
41 42 import org.thingsboard.server.common.data.id.RuleNodeId;
... ... @@ -269,6 +270,11 @@ class DefaultTbContext implements TbContext {
269 270 return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action);
270 271 }
271 272
  273 + @Override
  274 + public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
  275 + mainCtx.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
  276 + }
  277 +
272 278 public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) {
273 279 try {
274 280 return TbMsg.newMsg(action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)));
... ...
... ... @@ -45,6 +45,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
45 45 import org.thingsboard.server.common.msg.TbMsg;
46 46 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
47 47 import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
  48 +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
48 49 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
49 50 import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
50 51 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
... ... @@ -157,6 +158,9 @@ public class TenantActor extends RuleChainManagerActor {
157 158 case RULE_CHAIN_TO_RULE_CHAIN_MSG:
158 159 onRuleChainMsg((RuleChainAwareMsg) msg);
159 160 break;
  161 + case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
  162 + onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);
  163 + break;
160 164 default:
161 165 return false;
162 166 }
... ... @@ -242,6 +246,11 @@ public class TenantActor extends RuleChainManagerActor {
242 246 () -> new DeviceActorCreator(systemContext, tenantId, deviceId));
243 247 }
244 248
  249 + private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) {
  250 + log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg);
  251 + systemContext.getEdgeRpcService().onEdgeEvent(msg.getEdgeId());
  252 + }
  253 +
245 254 public static class ActorCreator extends ContextBasedCreator {
246 255
247 256 private final TenantId tenantId;
... ...
... ... @@ -832,6 +832,7 @@ public abstract class BaseController {
832 832 builder.setBody(body);
833 833 }
834 834 TransportProtos.EdgeNotificationMsgProto msg = builder.build();
  835 + log.trace("[{}] sending notification to edge service {}", tenantId.getId(), msg);
835 836 tbClusterService.pushMsgToCore(tenantId, entityId != null ? entityId : tenantId,
836 837 TransportProtos.ToCoreMsg.newBuilder().setEdgeNotificationMsg(msg).build(), null);
837 838 }
... ...
... ... @@ -251,12 +251,11 @@ public class RuleChainController extends BaseController {
251 251 try {
252 252 TenantId tenantId = getCurrentUser().getTenantId();
253 253 TextPageLink pageLink = createPageLink(limit, textSearch, idOffset, textOffset);
  254 + RuleChainType type = RuleChainType.CORE;
254 255 if (typeStr != null && typeStr.trim().length() > 0) {
255   - RuleChainType type = RuleChainType.valueOf(typeStr);
256   - return checkNotNull(ruleChainService.findTenantRuleChainsByType(tenantId, type, pageLink));
257   - } else {
258   - return checkNotNull(ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, pageLink));
  256 + type = RuleChainType.valueOf(typeStr);
259 257 }
  258 + return checkNotNull(ruleChainService.findTenantRuleChainsByType(tenantId, type, pageLink));
260 259 } catch (Exception e) {
261 260 throw handleException(e);
262 261 }
... ...
... ... @@ -55,6 +55,7 @@ import org.thingsboard.server.dao.user.UserService;
55 55 import org.thingsboard.server.gen.transport.TransportProtos;
56 56 import org.thingsboard.server.queue.util.TbCoreComponent;
57 57 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
  58 +import org.thingsboard.server.service.queue.TbClusterService;
58 59
59 60 import javax.annotation.PostConstruct;
60 61 import javax.annotation.PreDestroy;
... ... @@ -74,6 +75,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
74 75
75 76 private static final ObjectMapper mapper = new ObjectMapper();
76 77
  78 + private static final int DEFAULT_LIMIT = 100;
  79 +
77 80 @Autowired
78 81 private EdgeService edgeService;
79 82
... ... @@ -90,6 +93,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
90 93 private EdgeEventService edgeEventService;
91 94
92 95 @Autowired
  96 + private TbClusterService clusterService;
  97 +
  98 + @Autowired
93 99 private DbCallbackExecutorService dbCallbackExecutorService;
94 100
95 101 private ExecutorService tsCallBackExecutor;
... ... @@ -137,7 +143,19 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
137 143 edgeEvent.setEntityId(entityId.getId());
138 144 }
139 145 edgeEvent.setBody(body);
140   - edgeEventService.saveAsync(edgeEvent);
  146 + ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
  147 + Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
  148 + @Override
  149 + public void onSuccess(@Nullable EdgeEvent result) {
  150 + clusterService.onEdgeEventUpdate(tenantId, edgeId);
  151 + }
  152 +
  153 + @Override
  154 + public void onFailure(Throwable t) {
  155 + log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
  156 + }
  157 + }, dbCallbackExecutorService);
  158 +
141 159 }
142 160
143 161 @Override
... ... @@ -195,13 +213,20 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
195 213 public void onSuccess(@Nullable Edge edge) {
196 214 if (edge != null && !customerId.isNullUid()) {
197 215 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null);
198   - TextPageData<User> pageData = userService.findCustomerUsers(tenantId, customerId, new TextPageLink(Integer.MAX_VALUE));
199   - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
200   - log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size());
201   - for (User user : pageData.getData()) {
202   - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
  216 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  217 + TextPageData<User> pageData;
  218 + do {
  219 + pageData = userService.findCustomerUsers(tenantId, customerId, pageLink);
  220 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  221 + log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size());
  222 + for (User user : pageData.getData()) {
  223 + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
  224 + }
  225 + if (pageData.hasNext()) {
  226 + pageLink = pageData.getNextPageLink();
  227 + }
203 228 }
204   - }
  229 + } while (pageData != null && pageData.hasNext());
205 230 }
206 231 }
207 232
... ... @@ -242,12 +267,19 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
242 267 case ADDED:
243 268 case UPDATED:
244 269 case DELETED:
245   - TextPageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
246   - if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) {
247   - for (Edge edge : edgesByTenantId.getData()) {
248   - saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
  270 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  271 + TextPageData<Edge> pageData;
  272 + do {
  273 + pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
  274 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  275 + for (Edge edge : pageData.getData()) {
  276 + saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
  277 + }
  278 + if (pageData.hasNext()) {
  279 + pageLink = pageData.getNextPageLink();
  280 + }
249 281 }
250   - }
  282 + } while (pageData != null && pageData.hasNext());
251 283 break;
252 284 }
253 285 }
... ... @@ -256,21 +288,28 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
256 288 EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
257 289 EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
258 290 EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
259   - TextPageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
260   - if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) {
261   - for (Edge edge : edgesByTenantId.getData()) {
262   - switch (actionType) {
263   - case UPDATED:
264   - if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(entityId)) {
  291 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  292 + TextPageData<Edge> pageData;
  293 + do {
  294 + pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
  295 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  296 + for (Edge edge : pageData.getData()) {
  297 + switch (actionType) {
  298 + case UPDATED:
  299 + if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(entityId)) {
  300 + saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
  301 + }
  302 + break;
  303 + case DELETED:
265 304 saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
266   - }
267   - break;
268   - case DELETED:
269   - saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
270   - break;
  305 + break;
  306 + }
  307 + }
  308 + if (pageData.hasNext()) {
  309 + pageLink = pageData.getNextPageLink();
271 310 }
272 311 }
273   - }
  312 + } while (pageData != null && pageData.hasNext());
274 313 }
275 314
276 315 private void processEntity(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
... ... @@ -337,26 +376,33 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
337 376 }, dbCallbackExecutorService);
338 377 break;
339 378 case DELETED:
340   - TextPageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
341   - if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) {
342   - for (Edge edge : edgesByTenantId.getData()) {
343   - saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
  379 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  380 + TextPageData<Edge> pageData;
  381 + do {
  382 + pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
  383 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  384 + for (Edge edge : pageData.getData()) {
  385 + saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null);
  386 + }
  387 + if (pageData.hasNext()) {
  388 + pageLink = pageData.getNextPageLink();
  389 + }
344 390 }
345   - }
  391 + } while (pageData != null && pageData.hasNext());
346 392 break;
347 393 case ASSIGNED_TO_EDGE:
348 394 case UNASSIGNED_FROM_EDGE:
349 395 EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
350 396 saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, null);
351 397 if (type.equals(EdgeEventType.RULE_CHAIN)) {
352   - updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId);
  398 + updateDependentRuleChains(tenantId, new RuleChainId(entityId.getId()), edgeId, new TimePageLink(DEFAULT_LIMIT));
353 399 }
354 400 break;
355 401 }
356 402 }
357 403
358   - private void updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId) {
359   - ListenableFuture<TimePageData<RuleChain>> future = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, new TimePageLink(Integer.MAX_VALUE));
  404 + private void updateDependentRuleChains(TenantId tenantId, RuleChainId processingRuleChainId, EdgeId edgeId, TimePageLink pageLink) {
  405 + ListenableFuture<TimePageData<RuleChain>> future = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, pageLink);
360 406 Futures.addCallback(future, new FutureCallback<TimePageData<RuleChain>>() {
361 407 @Override
362 408 public void onSuccess(@Nullable TimePageData<RuleChain> pageData) {
... ... @@ -379,6 +425,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
379 425 }
380 426 }
381 427 }
  428 + if (pageData.hasNext()) {
  429 + updateDependentRuleChains(tenantId, processingRuleChainId, edgeId, pageData.getNextPageLink());
  430 + }
382 431 }
383 432 }
384 433
... ...
... ... @@ -49,6 +49,7 @@ import java.io.IOException;
49 49 import java.util.Collections;
50 50 import java.util.Map;
51 51 import java.util.concurrent.ConcurrentHashMap;
  52 +import java.util.concurrent.ConcurrentMap;
52 53 import java.util.concurrent.ExecutorService;
53 54 import java.util.concurrent.Executors;
54 55 import java.util.concurrent.TimeUnit;
... ... @@ -59,7 +60,8 @@ import java.util.concurrent.TimeUnit;
59 60 @TbCoreComponent
60 61 public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService {
61 62
62   - private final Map<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
  63 + private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>();
  64 + private final ConcurrentMap<EdgeId, Boolean> sessionNewEvents = new ConcurrentHashMap<>();
63 65 private static final ObjectMapper mapper = new ObjectMapper();
64 66
65 67 @Value("${edges.rpc.port}")
... ... @@ -147,12 +149,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
147 149 log.debug("Closing and removing session for edge [{}]", edgeId);
148 150 session.close();
149 151 sessions.remove(edgeId);
  152 + sessionNewEvents.remove(edgeId);
  153 + }
  154 + }
  155 +
  156 + @Override
  157 + public void onEdgeEvent(EdgeId edgeId) {
  158 + log.trace("[{}] onEdgeEvent", edgeId.getId());
  159 + if (!sessionNewEvents.get(edgeId)) {
  160 + log.trace("[{}] set session new events flag to true", edgeId.getId());
  161 + sessionNewEvents.put(edgeId, true);
150 162 }
151 163 }
152 164
153 165 private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
154 166 log.debug("[{}] onEdgeConnect [{}]", edgeId, edgeGrpcSession.getSessionId());
155 167 sessions.put(edgeId, edgeGrpcSession);
  168 + sessionNewEvents.put(edgeId, false);
156 169 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);
157 170 save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis());
158 171 }
... ... @@ -171,15 +184,23 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
171 184 while (!Thread.interrupted()) {
172 185 try {
173 186 if (sessions.size() > 0) {
174   - for (EdgeGrpcSession session : sessions.values()) {
175   - session.processHandleMessages();
  187 + for (Map.Entry<EdgeId, EdgeGrpcSession> entry : sessions.entrySet()) {
  188 + EdgeId edgeId = entry.getKey();
  189 + EdgeGrpcSession session = entry.getValue();
  190 + if (sessionNewEvents.get(edgeId)) {
  191 + log.trace("[{}] set session new events flag to false", edgeId.getId());
  192 + sessionNewEvents.put(edgeId, false);
  193 + // TODO: voba - at the moment all edge events are processed in a single thread. Maybe this should be updated?
  194 + session.processHandleMessages();
  195 + }
176 196 }
177 197 } else {
178   - log.trace("No sessions available, sleep for the next run");
179   - try {
180   - Thread.sleep(1000);
181   - } catch (InterruptedException ignore) {
182   - }
  198 + log.trace("No sessions available");
  199 + }
  200 + log.trace("Sleep for the next run");
  201 + try {
  202 + Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
  203 + } catch (InterruptedException ignore) {
183 204 }
184 205 } catch (Exception e) {
185 206 log.warn("Failed to process messages handling!", e);
... ... @@ -195,6 +216,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
195 216 private void onEdgeDisconnect(EdgeId edgeId) {
196 217 log.debug("[{}] onEdgeDisconnect", edgeId);
197 218 sessions.remove(edgeId);
  219 + sessionNewEvents.remove(edgeId);
198 220 save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
199 221 save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis());
200 222 }
... ...
... ... @@ -304,11 +304,6 @@ public final class EdgeGrpcSession implements Closeable {
304 304 Long newStartTs = UUIDs.unixTimestamp(ifOffset);
305 305 updateQueueStartTs(newStartTs);
306 306 }
307   - try {
308   - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
309   - } catch (InterruptedException e) {
310   - log.error("[{}] Error during sleep between no records interval", this.sessionId, e);
311   - }
312 307 }
313 308 log.trace("[{}] processHandleMessages finished", this.sessionId);
314 309 }
... ...
... ... @@ -23,4 +23,6 @@ public interface EdgeRpcService {
23 23 void updateEdge(Edge edge);
24 24
25 25 void deleteEdge(EdgeId edgeId);
  26 +
  27 + void onEdgeEvent(EdgeId edgeId);
26 28 }
... ...
... ... @@ -82,6 +82,7 @@ import org.thingsboard.server.gen.edge.RelationRequestMsg;
82 82 import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
83 83 import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
84 84 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
  85 +import org.thingsboard.server.service.queue.TbClusterService;
85 86
86 87 import java.io.File;
87 88 import java.nio.charset.StandardCharsets;
... ... @@ -99,6 +100,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
99 100
100 101 private static final ObjectMapper mapper = new ObjectMapper();
101 102
  103 + private static final int DEFAULT_LIMIT = 100;
  104 +
102 105 @Autowired
103 106 private EdgeEventService edgeEventService;
104 107
... ... @@ -138,28 +141,31 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
138 141 @Autowired
139 142 private DbCallbackExecutorService dbCallbackExecutorService;
140 143
  144 + @Autowired
  145 + private TbClusterService tbClusterService;
  146 +
141 147 @Override
142 148 public void sync(Edge edge) {
143 149 log.trace("[{}] staring sync process for edge [{}]", edge.getTenantId(), edge.getName());
144 150 try {
145 151 syncWidgetsBundleAndWidgetTypes(edge);
146 152 syncAdminSettings(edge);
147   - syncRuleChains(edge);
  153 + syncRuleChains(edge, new TimePageLink(DEFAULT_LIMIT));
148 154 syncUsers(edge);
149   - syncDevices(edge);
150   - syncAssets(edge);
151   - syncEntityViews(edge);
152   - syncDashboards(edge);
  155 + syncDevices(edge, new TimePageLink(DEFAULT_LIMIT));
  156 + syncAssets(edge, new TimePageLink(DEFAULT_LIMIT));
  157 + syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT));
  158 + syncDashboards(edge, new TimePageLink(DEFAULT_LIMIT));
153 159 } catch (Exception e) {
154 160 log.error("Exception during sync process", e);
155 161 }
156 162 }
157 163
158   - private void syncRuleChains(Edge edge) {
159   - log.trace("[{}] syncRuleChains [{}]", edge.getTenantId(), edge.getName());
  164 + private void syncRuleChains(Edge edge, TimePageLink pageLink) {
  165 + log.trace("[{}] syncRuleChains [{}] [{}]", edge.getTenantId(), edge.getName(), pageLink);
160 166 try {
161 167 ListenableFuture<TimePageData<RuleChain>> future =
162   - ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE));
  168 + ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
163 169 Futures.addCallback(future, new FutureCallback<TimePageData<RuleChain>>() {
164 170 @Override
165 171 public void onSuccess(@Nullable TimePageData<RuleChain> pageData) {
... ... @@ -168,6 +174,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
168 174 for (RuleChain ruleChain : pageData.getData()) {
169 175 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.ADDED, ruleChain.getId(), null);
170 176 }
  177 + if (pageData.hasNext()) {
  178 + syncRuleChains(edge, pageData.getNextPageLink());
  179 + }
171 180 }
172 181 }
173 182
... ... @@ -181,11 +190,11 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
181 190 }
182 191 }
183 192
184   - private void syncDevices(Edge edge) {
  193 + private void syncDevices(Edge edge, TimePageLink pageLink) {
185 194 log.trace("[{}] syncDevices [{}]", edge.getTenantId(), edge.getName());
186 195 try {
187 196 ListenableFuture<TimePageData<Device>> future =
188   - deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE));
  197 + deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
189 198 Futures.addCallback(future, new FutureCallback<TimePageData<Device>>() {
190 199 @Override
191 200 public void onSuccess(@Nullable TimePageData<Device> pageData) {
... ... @@ -194,6 +203,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
194 203 for (Device device : pageData.getData()) {
195 204 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
196 205 }
  206 + if (pageData.hasNext()) {
  207 + syncDevices(edge, pageData.getNextPageLink());
  208 + }
197 209 }
198 210 }
199 211
... ... @@ -207,10 +219,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
207 219 }
208 220 }
209 221
210   - private void syncAssets(Edge edge) {
  222 + private void syncAssets(Edge edge, TimePageLink pageLink) {
211 223 log.trace("[{}] syncAssets [{}]", edge.getTenantId(), edge.getName());
212 224 try {
213   - ListenableFuture<TimePageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE));
  225 + ListenableFuture<TimePageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
214 226 Futures.addCallback(future, new FutureCallback<TimePageData<Asset>>() {
215 227 @Override
216 228 public void onSuccess(@Nullable TimePageData<Asset> pageData) {
... ... @@ -219,6 +231,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
219 231 for (Asset asset : pageData.getData()) {
220 232 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null);
221 233 }
  234 + if (pageData.hasNext()) {
  235 + syncAssets(edge, pageData.getNextPageLink());
  236 + }
222 237 }
223 238 }
224 239
... ... @@ -232,10 +247,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
232 247 }
233 248 }
234 249
235   - private void syncEntityViews(Edge edge) {
  250 + private void syncEntityViews(Edge edge, TimePageLink pageLink) {
236 251 log.trace("[{}] syncEntityViews [{}]", edge.getTenantId(), edge.getName());
237 252 try {
238   - ListenableFuture<TimePageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE));
  253 + ListenableFuture<TimePageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
239 254 Futures.addCallback(future, new FutureCallback<TimePageData<EntityView>>() {
240 255 @Override
241 256 public void onSuccess(@Nullable TimePageData<EntityView> pageData) {
... ... @@ -244,6 +259,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
244 259 for (EntityView entityView : pageData.getData()) {
245 260 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null);
246 261 }
  262 + if (pageData.hasNext()) {
  263 + syncEntityViews(edge, pageData.getNextPageLink());
  264 + }
247 265 }
248 266 }
249 267
... ... @@ -257,10 +275,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
257 275 }
258 276 }
259 277
260   - private void syncDashboards(Edge edge) {
  278 + private void syncDashboards(Edge edge, TimePageLink pageLink) {
261 279 log.trace("[{}] syncDashboards [{}]", edge.getTenantId(), edge.getName());
262 280 try {
263   - ListenableFuture<TimePageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE));
  281 + ListenableFuture<TimePageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink);
264 282 Futures.addCallback(future, new FutureCallback<TimePageData<DashboardInfo>>() {
265 283 @Override
266 284 public void onSuccess(@Nullable TimePageData<DashboardInfo> pageData) {
... ... @@ -269,6 +287,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
269 287 for (DashboardInfo dashboardInfo : pageData.getData()) {
270 288 saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.DASHBOARD, EdgeEventActionType.ADDED, dashboardInfo.getId(), null);
271 289 }
  290 + if (pageData.hasNext()) {
  291 + syncDashboards(edge, pageData.getNextPageLink());
  292 + }
272 293 }
273 294 }
274 295
... ... @@ -285,18 +306,36 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
285 306 private void syncUsers(Edge edge) {
286 307 log.trace("[{}] syncUsers [{}]", edge.getTenantId(), edge.getName());
287 308 try {
288   - TextPageData<User> pageData = userService.findTenantAdmins(edge.getTenantId(), new TextPageLink(Integer.MAX_VALUE));
289   - pushUsersToEdge(pageData, edge);
290   - if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
291   - saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
292   - pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), new TextPageLink(Integer.MAX_VALUE));
  309 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  310 + TextPageData<User> pageData;
  311 + do {
  312 + pageData = userService.findTenantAdmins(edge.getTenantId(), pageLink);
293 313 pushUsersToEdge(pageData, edge);
294   - }
  314 + syncCustomerUsers(edge);
  315 + if (pageData != null && pageData.hasNext()) {
  316 + pageLink = pageData.getNextPageLink();
  317 + }
  318 + } while (pageData != null && pageData.hasNext());
295 319 } catch (Exception e) {
296 320 log.error("Exception during loading edge user(s) on sync!", e);
297 321 }
298 322 }
299 323
  324 + private void syncCustomerUsers(Edge edge) {
  325 + if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
  326 + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
  327 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  328 + TextPageData<User> pageData;
  329 + do {
  330 + pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), pageLink);
  331 + pushUsersToEdge(pageData, edge);
  332 + if (pageData != null && pageData.hasNext()) {
  333 + pageLink = pageData.getNextPageLink();
  334 + }
  335 + } while (pageData != null && pageData.hasNext());
  336 + }
  337 + }
  338 +
300 339 private void syncWidgetsBundleAndWidgetTypes(Edge edge) {
301 340 log.trace("[{}] syncWidgetsBundleAndWidgetTypes [{}]", edge.getTenantId(), edge.getName());
302 341 List<WidgetsBundle> widgetsBundlesToPush = new ArrayList<>();
... ... @@ -426,7 +465,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
426 465 final EdgeEventType type = getEdgeQueueTypeByEntityType(entityId.getEntityType());
427 466 if (type != null) {
428 467 SettableFuture<Void> futureToSet = SettableFuture.create();
429   - ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SERVER_SCOPE);
  468 + String scope = attributesRequestMsg.getScope();
  469 + ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, scope);
430 470 Futures.addCallback(ssAttrFuture, new FutureCallback<List<AttributeKvEntry>>() {
431 471 @Override
432 472 public void onSuccess(@Nullable List<AttributeKvEntry> ssAttributes) {
... ... @@ -446,7 +486,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
446 486 }
447 487 }
448 488 entityData.put("kv", attributes);
449   - entityData.put("scope", DataConstants.SERVER_SCOPE);
  489 + entityData.put("scope", scope);
450 490 JsonNode body = mapper.valueToTree(entityData);
451 491 log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
452 492 saveEdgeEvent(edge.getTenantId(),
... ... @@ -459,6 +499,11 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
459 499 log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
460 500 throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e);
461 501 }
  502 + } else {
  503 + log.trace("[{}][{}] No attributes found for entity {} [{}]", edge.getTenantId(),
  504 + edge.getName(),
  505 + entityId.getEntityType(),
  506 + entityId.getId());
462 507 }
463 508 futureToSet.set(null);
464 509 }
... ... @@ -470,10 +515,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
470 515 }
471 516 }, dbCallbackExecutorService);
472 517 return futureToSet;
473   - // TODO: voba - push shared attributes to edge?
474   - // ListenableFuture<List<AttributeKvEntry>> shAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.SHARED_SCOPE);
475   - // ListenableFuture<List<AttributeKvEntry>> clAttrFuture = attributesService.findAll(edge.getTenantId(), entityId, DataConstants.CLIENT_SCOPE);
476 518 } else {
  519 + log.warn("[{}] Type doesn't supported {}", edge.getTenantId(), entityId.getEntityType());
477 520 return Futures.immediateFuture(null);
478 521 }
479 522 }
... ... @@ -585,11 +628,11 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
585 628 }
586 629
587 630 private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
588   - EdgeId edgeId,
589   - EdgeEventType type,
590   - EdgeEventActionType action,
591   - EntityId entityId,
592   - JsonNode body) {
  631 + EdgeId edgeId,
  632 + EdgeEventType type,
  633 + EdgeEventActionType action,
  634 + EntityId entityId,
  635 + JsonNode body) {
593 636 log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]",
594 637 tenantId, edgeId, type, action, entityId, body);
595 638
... ... @@ -602,6 +645,18 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
602 645 edgeEvent.setEntityId(entityId.getId());
603 646 }
604 647 edgeEvent.setBody(body);
605   - return edgeEventService.saveAsync(edgeEvent);
  648 + ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
  649 + Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
  650 + @Override
  651 + public void onSuccess(@Nullable EdgeEvent result) {
  652 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
  653 + }
  654 +
  655 + @Override
  656 + public void onFailure(Throwable t) {
  657 + log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
  658 + }
  659 + }, dbCallbackExecutorService);
  660 + return future;
606 661 }
607 662 }
... ...
... ... @@ -17,8 +17,11 @@ package org.thingsboard.server.service.edge.rpc.processor;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.util.concurrent.FutureCallback;
  21 +import com.google.common.util.concurrent.Futures;
20 22 import com.google.common.util.concurrent.ListenableFuture;
21 23 import lombok.extern.slf4j.Slf4j;
  24 +import org.checkerframework.checker.nullness.qual.Nullable;
22 25 import org.springframework.beans.factory.annotation.Autowired;
23 26 import org.thingsboard.server.common.data.edge.EdgeEvent;
24 27 import org.thingsboard.server.common.data.edge.EdgeEventActionType;
... ... @@ -107,6 +110,18 @@ public abstract class BaseProcessor {
107 110 edgeEvent.setEntityId(entityId.getId());
108 111 }
109 112 edgeEvent.setBody(body);
110   - return edgeEventService.saveAsync(edgeEvent);
  113 + ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
  114 + Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
  115 + @Override
  116 + public void onSuccess(@Nullable EdgeEvent result) {
  117 + tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
  118 + }
  119 +
  120 + @Override
  121 + public void onFailure(Throwable t) {
  122 + log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t);
  123 + }
  124 + }, dbCallbackExecutorService);
  125 + return future;
111 126 }
112 127 }
... ...
... ... @@ -22,10 +22,12 @@ import org.springframework.scheduling.annotation.Scheduled;
22 22 import org.springframework.stereotype.Service;
23 23 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
24 24 import org.thingsboard.server.common.data.EntityType;
  25 +import org.thingsboard.server.common.data.id.EdgeId;
25 26 import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
28 29 import org.thingsboard.server.common.msg.TbMsg;
  30 +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
29 31 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
30 32 import org.thingsboard.server.common.msg.queue.ServiceType;
31 33 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
... ... @@ -163,11 +165,31 @@ public class DefaultTbClusterService implements TbClusterService {
163 165 broadcast(new ComponentLifecycleMsg(tenantId, entityId, state));
164 166 }
165 167
  168 + @Override
  169 + public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
  170 + log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);
  171 + EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId);
  172 + byte[] msgBytes = encodingService.encode(msg);
  173 + TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
  174 + Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
  175 + for (String serviceId : tbCoreServices) {
  176 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
  177 + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build();
  178 + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null);
  179 + toCoreNfs.incrementAndGet();
  180 + }
  181 + }
  182 +
166 183 private void broadcast(ComponentLifecycleMsg msg) {
167 184 byte[] msgBytes = encodingService.encode(msg);
168 185 TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
169 186 Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
170   - if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)) {
  187 + boolean toCore = msg.getEntityId().getEntityType().equals(EntityType.TENANT) ||
  188 + msg.getEntityId().getEntityType().equals(EntityType.EDGE);
  189 +
  190 + boolean toRuleEngine = !msg.getEntityId().getEntityType().equals(EntityType.EDGE);
  191 +
  192 + if (toCore) {
171 193 TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
172 194 Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
173 195 for (String serviceId : tbCoreServices) {
... ... @@ -179,11 +201,13 @@ public class DefaultTbClusterService implements TbClusterService {
179 201 // No need to push notifications twice
180 202 tbRuleEngineServices.removeAll(tbCoreServices);
181 203 }
182   - for (String serviceId : tbRuleEngineServices) {
183   - TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
184   - ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
185   - toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
186   - toRuleEngineNfs.incrementAndGet();
  204 + if (toRuleEngine) {
  205 + for (String serviceId : tbRuleEngineServices) {
  206 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
  207 + ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
  208 + toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
  209 + toRuleEngineNfs.incrementAndGet();
  210 + }
187 211 }
188 212 }
189 213
... ...
... ... @@ -203,18 +203,24 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
203 203 log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
204 204 forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
205 205 } else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
206   - Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
207   - if (actorMsg.isPresent()) {
208   - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
209   - actorContext.tellWithHighPriority(actorMsg.get());
210   - }
211   - callback.onSuccess();
  206 + forwardToAppActor(id, toCoreNotification.getComponentLifecycleMsg().toByteArray(), callback);
  207 + } else if (toCoreNotification.getEdgeEventUpdateMsg() != null && !toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
  208 + forwardToAppActor(id, toCoreNotification.getEdgeEventUpdateMsg().toByteArray(), callback);
212 209 }
213 210 if (statsEnabled) {
214 211 stats.log(toCoreNotification);
215 212 }
216 213 }
217 214
  215 + private void forwardToAppActor(UUID id, byte[] msgBytes, TbCallback callback) {
  216 + Optional<TbActorMsg> actorMsg = encodingService.decode(msgBytes);
  217 + if (actorMsg.isPresent()) {
  218 + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  219 + actorContext.tellWithHighPriority(actorMsg.get());
  220 + }
  221 + callback.onSuccess();
  222 + }
  223 +
218 224 private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
219 225 RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
220 226 FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.queue;
17 17
18 18 import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
  19 +import org.thingsboard.server.common.data.id.EdgeId;
19 20 import org.thingsboard.server.common.data.id.EntityId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
21 22 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
... ... @@ -49,4 +50,6 @@ public interface TbClusterService {
49 50
50 51 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
51 52
  53 + void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId);
  54 +
52 55 }
... ...
... ... @@ -52,7 +52,6 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
52 52 private final TbClusterService clusterService;
53 53 private final TbServiceInfoProvider serviceInfoProvider;
54 54
55   -
56 55 private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> toDeviceRpcRequests = new ConcurrentHashMap<>();
57 56
58 57 private Optional<TbCoreDeviceRpcService> tbCoreRpcService;
... ...
... ... @@ -98,6 +98,7 @@ import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg;
98 98 import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg;
99 99 import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg;
100 100 import org.thingsboard.server.gen.transport.TransportProtos;
  101 +import org.thingsboard.server.service.queue.TbClusterService;
101 102
102 103 import java.util.ArrayList;
103 104 import java.util.List;
... ... @@ -122,6 +123,9 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
122 123 @Autowired
123 124 private EdgeEventService edgeEventService;
124 125
  126 + @Autowired
  127 + private TbClusterService clusterService;
  128 +
125 129 @Before
126 130 public void beforeTest() throws Exception {
127 131 loginSysAdmin();
... ... @@ -227,6 +231,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
227 231 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body);
228 232 edgeImitator.expectMessageAmount(1);
229 233 edgeEventService.saveAsync(edgeEvent);
  234 + clusterService.onEdgeEventUpdate(tenantId, edge.getId());
230 235 edgeImitator.waitForMessages();
231 236
232 237 AbstractMessage latestMessage = edgeImitator.getLatestMessage();
... ... @@ -847,6 +852,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
847 852 EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
848 853 edgeImitator.expectMessageAmount(1);
849 854 edgeEventService.saveAsync(edgeEvent1);
  855 + clusterService.onEdgeEventUpdate(tenantId, edge.getId());
850 856 edgeImitator.waitForMessages();
851 857
852 858 AbstractMessage latestMessage = edgeImitator.getLatestMessage();
... ... @@ -885,6 +891,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
885 891 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData);
886 892 edgeImitator.expectMessageAmount(1);
887 893 edgeEventService.saveAsync(edgeEvent);
  894 + clusterService.onEdgeEventUpdate(tenantId, edge.getId());
888 895 edgeImitator.waitForMessages();
889 896
890 897 AbstractMessage latestMessage = edgeImitator.getLatestMessage();
... ... @@ -910,6 +917,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
910 917 EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData);
911 918 edgeImitator.expectMessageAmount(1);
912 919 edgeEventService.saveAsync(edgeEvent);
  920 + clusterService.onEdgeEventUpdate(tenantId, edge.getId());
913 921 edgeImitator.waitForMessages();
914 922
915 923 AbstractMessage latestMessage = edgeImitator.getLatestMessage();
... ... @@ -934,6 +942,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
934 942 EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData);
935 943 edgeImitator.expectMessageAmount(1);
936 944 edgeEventService.saveAsync(edgeEvent1);
  945 + clusterService.onEdgeEventUpdate(tenantId, edge.getId());
937 946 edgeImitator.waitForMessages();
938 947
939 948 AbstractMessage latestMessage = edgeImitator.getLatestMessage();
... ... @@ -1160,6 +1169,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1160 1169 edgeImitator.sendUplinkMsg(uplinkMsgBuilder2.build());
1161 1170 edgeImitator.waitForResponses();
1162 1171
  1172 + // Wait before device attributes saved to database before requesting them from controller
1163 1173 Thread.sleep(1000);
1164 1174 Map<String, List<Map<String, String>>> timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/values/timeseries?keys=" + timeseriesKey, Map.class);
1165 1175 Assert.assertTrue(timeseries.containsKey(timeseriesKey));
... ... @@ -1302,18 +1312,25 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1302 1312
1303 1313 private void sendAttributesRequest() throws Exception {
1304 1314 Device device = findDeviceByName("Edge Device 1");
  1315 + sendAttributesRequest(device, DataConstants.SERVER_SCOPE, "{\"key1\":\"value1\"}", "key1", "value1");
  1316 + sendAttributesRequest(device, DataConstants.SHARED_SCOPE, "{\"key2\":\"value2\"}", "key2", "value2");
  1317 + }
1305 1318
1306   - String attributesDataStr = "{\"key1\":\"value1\"}";
  1319 + private void sendAttributesRequest(Device device, String scope, String attributesDataStr, String expectedKey, String expectedValue) throws Exception {
1307 1320 JsonNode attributesData = mapper.readTree(attributesDataStr);
1308 1321
1309   - doPost("/api/plugins/telemetry/DEVICE/" + device.getId().getId().toString() + "/attributes/" + DataConstants.SERVER_SCOPE,
  1322 + doPost("/api/plugins/telemetry/DEVICE/" + device.getId().getId().toString() + "/attributes/" + scope,
1310 1323 attributesData);
1311 1324
  1325 + // Wait before device attributes saved to database before requesting them from edge
  1326 + Thread.sleep(1000);
  1327 +
1312 1328 UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
1313 1329 AttributesRequestMsg.Builder attributesRequestMsgBuilder = AttributesRequestMsg.newBuilder();
1314 1330 attributesRequestMsgBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
1315 1331 attributesRequestMsgBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
1316 1332 attributesRequestMsgBuilder.setEntityType(EntityType.DEVICE.name());
  1333 + attributesRequestMsgBuilder.setScope(scope);
1317 1334 testAutoGeneratedCodeByProtobuf(attributesRequestMsgBuilder);
1318 1335 uplinkMsgBuilder.addAttributesRequestMsg(attributesRequestMsgBuilder.build());
1319 1336 testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
... ... @@ -1330,14 +1347,14 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
1330 1347 Assert.assertEquals(device.getUuidId().getMostSignificantBits(), latestEntityDataMsg.getEntityIdMSB());
1331 1348 Assert.assertEquals(device.getUuidId().getLeastSignificantBits(), latestEntityDataMsg.getEntityIdLSB());
1332 1349 Assert.assertEquals(device.getId().getEntityType().name(), latestEntityDataMsg.getEntityType());
1333   - Assert.assertEquals("SERVER_SCOPE", latestEntityDataMsg.getPostAttributeScope());
  1350 + Assert.assertEquals(scope, latestEntityDataMsg.getPostAttributeScope());
1334 1351 Assert.assertTrue(latestEntityDataMsg.hasAttributesUpdatedMsg());
1335 1352
1336 1353 TransportProtos.PostAttributeMsg attributesUpdatedMsg = latestEntityDataMsg.getAttributesUpdatedMsg();
1337 1354 Assert.assertEquals(1, attributesUpdatedMsg.getKvCount());
1338 1355 TransportProtos.KeyValueProto keyValueProto = attributesUpdatedMsg.getKv(0);
1339   - Assert.assertEquals("key1", keyValueProto.getKey());
1340   - Assert.assertEquals("value1", keyValueProto.getStringV());
  1356 + Assert.assertEquals(expectedKey, keyValueProto.getKey());
  1357 + Assert.assertEquals(expectedValue, keyValueProto.getStringV());
1341 1358 }
1342 1359
1343 1360 private void sendDeleteDeviceOnEdge() throws Exception {
... ...
... ... @@ -56,6 +56,8 @@ import java.util.Optional;
56 56 import java.util.UUID;
57 57 import java.util.concurrent.CountDownLatch;
58 58 import java.util.concurrent.TimeUnit;
  59 +import java.util.concurrent.locks.Lock;
  60 +import java.util.concurrent.locks.ReentrantLock;
59 61
60 62 @Slf4j
61 63 public class EdgeImitator {
... ... @@ -65,6 +67,8 @@ public class EdgeImitator {
65 67
66 68 private EdgeRpcClient edgeRpcClient;
67 69
  70 + private final Lock lock = new ReentrantLock();
  71 +
68 72 private CountDownLatch messagesLatch;
69 73 private CountDownLatch responsesLatch;
70 74 private List<Class<? extends AbstractMessage>> ignoredTypes;
... ... @@ -74,7 +78,7 @@ public class EdgeImitator {
74 78 @Getter
75 79 private UserId userId;
76 80 @Getter
77   - private List<AbstractMessage> downlinkMsgs;
  81 + private final List<AbstractMessage> downlinkMsgs;
78 82
79 83 public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException {
80 84 edgeRpcClient = new EdgeGrpcClient();
... ... @@ -241,7 +245,12 @@ public class EdgeImitator {
241 245
242 246 private ListenableFuture<Void> saveDownlinkMsg(AbstractMessage message) {
243 247 if (!ignoredTypes.contains(message.getClass())) {
244   - downlinkMsgs.add(message);
  248 + try {
  249 + lock.lock();
  250 + downlinkMsgs.add(message);
  251 + } finally {
  252 + lock.unlock();
  253 + }
245 254 messagesLatch.countDown();
246 255 }
247 256 return Futures.immediateFuture(null);
... ... @@ -262,7 +271,14 @@ public class EdgeImitator {
262 271 }
263 272
264 273 public <T> Optional<T> findMessageByType(Class<T> tClass) {
265   - return (Optional<T>) downlinkMsgs.stream().filter(downlinkMsg -> downlinkMsg.getClass().isAssignableFrom(tClass)).findAny();
  274 + Optional<T> result;
  275 + try {
  276 + lock.lock();
  277 + result = (Optional<T>) downlinkMsgs.stream().filter(downlinkMsg -> downlinkMsg.getClass().isAssignableFrom(tClass)).findAny();
  278 + } finally {
  279 + lock.unlock();
  280 + }
  281 + return result;
266 282 }
267 283
268 284 public AbstractMessage getLatestMessage() {
... ...
... ... @@ -315,6 +315,7 @@ message AttributesRequestMsg {
315 315 int64 entityIdMSB = 1;
316 316 int64 entityIdLSB = 2;
317 317 string entityType = 3;
  318 + string scope = 4;
318 319 }
319 320
320 321 message RelationRequestMsg {
... ...
... ... @@ -105,6 +105,11 @@ public enum MsgType {
105 105 /**
106 106 * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself.
107 107 */
108   - TRANSPORT_TO_DEVICE_ACTOR_MSG;
  108 + TRANSPORT_TO_DEVICE_ACTOR_MSG,
  109 +
  110 + /**
  111 + * Message that is sent on Edge Event to Edge Session
  112 + */
  113 + EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG;
109 114
110 115 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.edge;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.ToString;
  20 +import org.thingsboard.server.common.data.id.EdgeId;
  21 +import org.thingsboard.server.common.data.id.TenantId;
  22 +import org.thingsboard.server.common.msg.MsgType;
  23 +import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  24 +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
  25 +
  26 +@ToString
  27 +public class EdgeEventUpdateMsg implements TenantAwareMsg, ToAllNodesMsg {
  28 + @Getter
  29 + private final TenantId tenantId;
  30 + @Getter
  31 + private final EdgeId edgeId;
  32 +
  33 + public EdgeEventUpdateMsg(TenantId tenantId, EdgeId edgeId) {
  34 + this.tenantId = tenantId;
  35 + this.edgeId = edgeId;
  36 + }
  37 +
  38 + @Override
  39 + public MsgType getMsgType() {
  40 + return MsgType.EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG;
  41 + }
  42 +}
... ...
... ... @@ -400,6 +400,7 @@ message ToCoreNotificationMsg {
400 400 LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1;
401 401 FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
402 402 bytes componentLifecycleMsg = 3;
  403 + bytes edgeEventUpdateMsg = 4;
403 404 }
404 405
405 406 /* Messages that are handled by ThingsBoard RuleEngine Service */
... ...
... ... @@ -98,6 +98,8 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
98 98 public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId ";
99 99 public static final String INCORRECT_EDGE_ID = "Incorrect edgeId ";
100 100
  101 + private static final int DEFAULT_LIMIT = 100;
  102 +
101 103 private RestTemplate restTemplate;
102 104
103 105 private static final String EDGE_LICENSE_SERVER_ENDPOINT = "https://license.thingsboard.io";
... ... @@ -460,8 +462,21 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
460 462 public ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) {
461 463 log.trace("[{}] Executing findRelatedEdgeIdsByEntityId [{}]", tenantId, entityId);
462 464 if (EntityType.TENANT.equals(entityId.getEntityType())) {
463   - TextPageData<Edge> edgesByTenantId = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
464   - return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList()));
  465 + List<EdgeId> result = new ArrayList<>();
  466 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  467 + TextPageData<Edge> pageData;
  468 + do {
  469 + pageData = findEdgesByTenantId(tenantId, pageLink);
  470 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  471 + for (Edge edge : pageData.getData()) {
  472 + result.add(edge.getId());
  473 + }
  474 + if (pageData.hasNext()) {
  475 + pageLink = pageData.getNextPageLink();
  476 + }
  477 + }
  478 + } while (pageData != null && pageData.hasNext());
  479 + return Futures.immediateFuture(result);
465 480 } else {
466 481 switch (entityId.getEntityType()) {
467 482 case DEVICE:
... ... @@ -486,13 +501,23 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
486 501 if (userById == null) {
487 502 return Futures.immediateFuture(Collections.emptyList());
488 503 }
489   - TextPageData<Edge> edges;
490   - if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) {
491   - edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE));
492   - } else {
493   - edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE));
494   - }
495   - return convertToEdgeIds(Futures.immediateFuture(edges.getData()));
  504 + List<Edge> result = new ArrayList<>();
  505 + TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
  506 + TextPageData<Edge> pageData;
  507 + do {
  508 + if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) {
  509 + pageData = findEdgesByTenantId(tenantId, pageLink);
  510 + } else {
  511 + pageData = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), pageLink);
  512 + }
  513 + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
  514 + result.addAll(pageData.getData());
  515 + if (pageData.hasNext()) {
  516 + pageLink = pageData.getNextPageLink();
  517 + }
  518 + }
  519 + } while (pageData != null && pageData.hasNext());
  520 + return convertToEdgeIds(Futures.immediateFuture(result));
496 521 default:
497 522 return Futures.immediateFuture(Collections.emptyList());
498 523 }
... ...
... ... @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.Customer;
23 23 import org.thingsboard.server.common.data.Device;
24 24 import org.thingsboard.server.common.data.alarm.Alarm;
25 25 import org.thingsboard.server.common.data.asset.Asset;
  26 +import org.thingsboard.server.common.data.id.EdgeId;
26 27 import org.thingsboard.server.common.data.id.EntityId;
27 28 import org.thingsboard.server.common.data.id.RuleNodeId;
28 29 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -145,6 +146,8 @@ public interface TbContext {
145 146 // TODO: Does this changes the message?
146 147 TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action);
147 148
  149 + void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId);
  150 +
148 151 /*
149 152 *
150 153 * METHODS TO PROCESS THE MESSAGES
... ...
... ... @@ -115,11 +115,12 @@ public class TbMsgPushToEdgeNode implements TbNode {
115 115 @Override
116 116 public void onSuccess(@Nullable EdgeEvent event) {
117 117 ctx.tellNext(msg, SUCCESS);
  118 + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
118 119 }
119 120
120 121 @Override
121 122 public void onFailure(Throwable th) {
122   - log.error("Could not save edge event", th);
  123 + log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th);
123 124 ctx.tellFailure(msg, th);
124 125 }
125 126 }, ctx.getDbCallbackExecutor());
... ...