Commit d380ac710646a2aea8693ea14aba1eaa4b9f4994

Authored by Andrii Shvaika
1 parent 006fcd03

Device -> Server RPC implementation and improvements

Showing 28 changed files with 266 additions and 320 deletions
... ... @@ -270,10 +270,6 @@ public class ActorSystemContext {
270 270 @Getter
271 271 private long queuePersistenceTimeout;
272 272
273   - @Value("${actors.client_side_rpc.timeout}")
274   - @Getter
275   - private long clientSideRpcTimeout;
276   -
277 273 @Value("${actors.rule.chain.error_persist_frequency}")
278 274 @Getter
279 275 private long ruleChainErrorPersistFrequency;
... ...
... ... @@ -22,11 +22,8 @@ import org.thingsboard.server.actors.service.ContextAwareActor;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23 23 import org.thingsboard.server.common.data.id.TenantId;
24 24 import org.thingsboard.server.common.msg.TbActorMsg;
25   -import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
26   -import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
27 25 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
28 26 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
29   -import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
30 27 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
31 28
32 29 public class DeviceActor extends ContextAwareActor {
... ... @@ -72,15 +69,9 @@ public class DeviceActor extends ContextAwareActor {
72 69 case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
73 70 processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg);
74 71 break;
75   - case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
76   - processor.processToServerRPCResponse(context(), (ToServerRpcResponseActorMsg) msg);
77   - break;
78 72 case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
79 73 processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg);
80 74 break;
81   - case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
82   - processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
83   - break;
84 75 case SESSION_TIMEOUT_MSG:
85 76 processor.checkSessionsTimeout();
86 77 break;
... ...
... ... @@ -36,12 +36,11 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
36 36 import org.thingsboard.server.common.data.kv.KvEntry;
37 37 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
38 38 import org.thingsboard.server.common.msg.TbMsgMetaData;
  39 +import org.thingsboard.server.common.msg.queue.TbCallback;
39 40 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
40   -import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
41 41 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
42   -import org.thingsboard.server.gen.transport.TransportProtos;
43 42 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
44   -import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
45 44 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
46 45 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
47 46 import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
... ... @@ -50,16 +49,19 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotifica
50 49 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
51 50 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
52 51 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  52 +import org.thingsboard.server.gen.transport.TransportProtos.SessionSubscriptionInfoProto;
  53 +import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
53 54 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
54 55 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
  56 +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
55 57 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
56 58 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  59 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
  60 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
57 61 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
58 62 import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
59   -import org.thingsboard.server.common.msg.queue.TbCallback;
60 63 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
61 64 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
62   -import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
63 65 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
64 66
65 67 import javax.annotation.Nullable;
... ... @@ -91,7 +93,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
91 93 private final Map<UUID, SessionInfo> attributeSubscriptions;
92 94 private final Map<UUID, SessionInfo> rpcSubscriptions;
93 95 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
94   - private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
95 96
96 97 private int rpcSeq = 0;
97 98 private String deviceName;
... ... @@ -106,7 +107,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
106 107 this.attributeSubscriptions = new HashMap<>();
107 108 this.rpcSubscriptions = new HashMap<>();
108 109 this.toDeviceRpcPendingMap = new HashMap<>();
109   - this.toServerRpcPendingMap = new HashMap<>();
110 110 if (initAttributes()) {
111 111 restoreSessions();
112 112 }
... ... @@ -142,7 +142,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
142 142 Set<UUID> syncSessionSet = new HashSet<>();
143 143 rpcSubscriptions.forEach((key, value) -> {
144 144 sendToTransport(rpcRequest, key, value.getNodeId());
145   - if (TransportProtos.SessionType.SYNC == value.getType()) {
  145 + if (SessionType.SYNC == value.getType()) {
146 146 syncSessionSet.add(key);
147 147 }
148 148 });
... ... @@ -177,10 +177,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
177 177 }
178 178
179 179 private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) {
180   - TransportProtos.SessionType sessionType = getSessionType(sessionId);
  180 + SessionType sessionType = getSessionType(sessionId);
181 181 if (!toDeviceRpcPendingMap.isEmpty()) {
182 182 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
183   - if (sessionType == TransportProtos.SessionType.SYNC) {
  183 + if (sessionType == SessionType.SYNC) {
184 184 log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
185 185 rpcSubscriptions.remove(sessionId);
186 186 }
... ... @@ -188,7 +188,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
188 188 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
189 189 }
190 190 Set<Integer> sentOneWayIds = new HashSet<>();
191   - if (sessionType == TransportProtos.SessionType.ASYNC) {
  191 + if (sessionType == SessionType.ASYNC) {
192 192 toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
193 193 } else {
194 194 toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
... ... @@ -297,46 +297,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
297 297 return new HashSet<>(strings);
298 298 }
299 299
300   - private void handleClientSideRPCRequest(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg request) {
301   -// UUID sessionId = getSessionId(sessionInfo);
302   -// JsonObject json = new JsonObject();
303   -// json.addProperty("method", request.getMethodName());
304   -// json.add("params", JsonUtils.parse(request.getParams()));
305   -//
306   -// TbMsgMetaData requestMetaData = defaultMetaData.copy();
307   -// requestMetaData.putValue("requestId", Integer.toString(request.getRequestId()));
308   -// TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, requestMetaData, TbMsgDataType.JSON, gson.toJson(json), null, null, null);
309   -// context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
310   -//
311   -// scheduleMsgWithDelay(context, new DeviceActorClientSideRpcTimeoutMsg(request.getRequestId(), systemContext.getClientSideRpcTimeout()), systemContext.getClientSideRpcTimeout());
312   -// toServerRpcPendingMap.put(request.getRequestId(), new ToServerRpcRequestMetadata(sessionId, getSessionType(sessionId), sessionInfo.getNodeId()));
313   - }
314   -
315   - private TransportProtos.SessionType getSessionType(UUID sessionId) {
316   - return sessions.containsKey(sessionId) ? TransportProtos.SessionType.ASYNC : TransportProtos.SessionType.SYNC;
317   - }
318   -
319   - void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) {
320   - ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId());
321   - if (data != null) {
322   - log.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId());
323   - sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
324   - .setRequestId(msg.getId()).setError("timeout").build()
325   - , data.getSessionId(), data.getNodeId());
326   - }
327   - }
328   -
329   - void processToServerRPCResponse(ActorContext context, ToServerRpcResponseActorMsg msg) {
330   - int requestId = msg.getMsg().getRequestId();
331   - ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
332   - if (data != null) {
333   - log.debug("[{}] Pushing reply to [{}][{}]!", deviceId, data.getNodeId(), data.getSessionId());
334   - sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder()
335   - .setRequestId(requestId).setPayload(msg.getMsg().getData()).build()
336   - , data.getSessionId(), data.getNodeId());
337   - } else {
338   - log.debug("[{}][{}] Pending RPC request to server not found!", deviceId, requestId);
339   - }
  300 + private SessionType getSessionType(UUID sessionId) {
  301 + return sessions.containsKey(sessionId) ? SessionType.ASYNC : SessionType.SYNC;
340 302 }
341 303
342 304 void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) {
... ... @@ -399,7 +361,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
399 361 } else {
400 362 SessionInfoMetaData sessionMD = sessions.get(sessionId);
401 363 if (sessionMD == null) {
402   - sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
  364 + sessionMD = new SessionInfoMetaData(new SessionInfo(SessionType.SYNC, sessionInfo.getNodeId()));
403 365 }
404 366 sessionMD.setSubscribedToAttributes(true);
405 367 log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
... ... @@ -420,7 +382,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
420 382 } else {
421 383 SessionInfoMetaData sessionMD = sessions.get(sessionId);
422 384 if (sessionMD == null) {
423   - sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
  385 + sessionMD = new SessionInfoMetaData(new SessionInfo(SessionType.SYNC, sessionInfo.getNodeId()));
424 386 }
425 387 sessionMD.setSubscribedToRPC(true);
426 388 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
... ... @@ -444,7 +406,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
444 406 notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
445 407 }
446 408 }
447   - sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())));
  409 + sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
448 410 if (sessions.size() == 1) {
449 411 reportSessionOpen();
450 412 }
... ... @@ -462,10 +424,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
462 424 }
463 425 }
464 426
465   - private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
  427 + private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
466 428 UUID sessionId = getSessionId(sessionInfoProto);
467 429 SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
468   - id -> new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
  430 + id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L));
469 431
470 432 sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
471 433 sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
... ... @@ -488,7 +450,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
488 450 }
489 451
490 452 private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) {
491   - DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  453 + ToTransportMsg msg = ToTransportMsg.newBuilder()
492 454 .setSessionIdMSB(sessionId.getMostSignificantBits())
493 455 .setSessionIdLSB(sessionId.getLeastSignificantBits())
494 456 .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
... ... @@ -504,7 +466,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
504 466 }
505 467
506 468 private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
507   - DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  469 + ToTransportMsg msg = ToTransportMsg.newBuilder()
508 470 .setSessionIdMSB(sessionInfo.getSessionIdMSB())
509 471 .setSessionIdLSB(sessionInfo.getSessionIdLSB())
510 472 .setGetAttributesResponse(responseMsg).build();
... ... @@ -512,7 +474,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
512 474 }
513 475
514 476 private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
515   - DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  477 + ToTransportMsg msg = ToTransportMsg.newBuilder()
516 478 .setSessionIdMSB(sessionId.getMostSignificantBits())
517 479 .setSessionIdLSB(sessionId.getLeastSignificantBits())
518 480 .setAttributeUpdateNotification(notificationMsg).build();
... ... @@ -520,15 +482,15 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
520 482 }
521 483
522 484 private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
523   - DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  485 + ToTransportMsg msg = ToTransportMsg.newBuilder()
524 486 .setSessionIdMSB(sessionId.getMostSignificantBits())
525 487 .setSessionIdLSB(sessionId.getLeastSignificantBits())
526 488 .setToDeviceRequest(rpcMsg).build();
527 489 systemContext.getTbCoreToTransportService().process(nodeId, msg);
528 490 }
529 491
530   - private void sendToTransport(TransportProtos.ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
531   - DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  492 + private void sendToTransport(ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
  493 + ToTransportMsg msg = ToTransportMsg.newBuilder()
532 494 .setSessionIdMSB(sessionId.getMostSignificantBits())
533 495 .setSessionIdLSB(sessionId.getLeastSignificantBits())
534 496 .setToServerResponse(rpcMsg).build();
... ... @@ -584,9 +546,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
584 546
585 547 private void restoreSessions() {
586 548 log.debug("[{}] Restoring sessions from cache", deviceId);
587   - TransportProtos.DeviceSessionsCacheEntry sessionsDump = null;
  549 + DeviceSessionsCacheEntry sessionsDump = null;
588 550 try {
589   - sessionsDump = TransportProtos.DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
  551 + sessionsDump = DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
590 552 } catch (InvalidProtocolBufferException e) {
591 553 log.warn("[{}] Failed to decode device sessions from cache", deviceId);
592 554 return;
... ... @@ -595,11 +557,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
595 557 log.debug("[{}] No session information found", deviceId);
596 558 return;
597 559 }
598   - for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
  560 + for (SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
599 561 SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
600 562 UUID sessionId = getSessionId(sessionInfoProto);
601   - SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
602   - TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
  563 + SessionInfo sessionInfo = new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId());
  564 + SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
603 565 SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
604 566 sessions.put(sessionId, sessionMD);
605 567 if (subInfo.getAttributeSubscription()) {
... ... @@ -617,27 +579,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
617 579
618 580 private void dumpSessions() {
619 581 log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
620   - List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
  582 + List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
621 583 sessions.forEach((uuid, sessionMD) -> {
622   - if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
  584 + if (sessionMD.getSessionInfo().getType() == SessionType.SYNC) {
623 585 return;
624 586 }
625 587 SessionInfo sessionInfo = sessionMD.getSessionInfo();
626   - TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
  588 + SubscriptionInfoProto subscriptionInfoProto = SubscriptionInfoProto.newBuilder()
627 589 .setLastActivityTime(sessionMD.getLastActivityTime())
628 590 .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
629 591 .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
630   - TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
  592 + SessionInfoProto sessionInfoProto = SessionInfoProto.newBuilder()
631 593 .setSessionIdMSB(uuid.getMostSignificantBits())
632 594 .setSessionIdLSB(uuid.getLeastSignificantBits())
633 595 .setNodeId(sessionInfo.getNodeId()).build();
634   - sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
  596 + sessionsList.add(SessionSubscriptionInfoProto.newBuilder()
635 597 .setSessionInfo(sessionInfoProto)
636 598 .setSubscriptionInfo(subscriptionInfoProto).build());
637 599 log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
638 600 });
639 601 systemContext.getDeviceSessionCacheService()
640   - .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
  602 + .put(deviceId, DeviceSessionsCacheEntry.newBuilder()
641 603 .addAllSessions(sessionsList).build().toByteArray());
642 604 }
643 605
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
37 37
38 38 import java.util.HashSet;
39 39 import java.util.Set;
  40 +import java.util.UUID;
40 41
41 42 @Service
42 43 @Slf4j
... ... @@ -100,7 +101,12 @@ public class DefaultTbClusterService implements TbClusterService {
100 101 response.getResponse().ifPresent(builder::setResponse);
101 102 ToRuleEngineNotificationMsg msg = ToRuleEngineNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build();
102 103 producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), null);
  104 + }
103 105
  106 + @Override
  107 + public void onToTransportMsg(String serviceId, ToTransportMsg response) {
  108 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId);
  109 + producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), null);
104 110 }
105 111
106 112 @Override
... ...
... ... @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.EntityId;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21 21 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
22 22 import org.thingsboard.server.common.msg.TbMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
23 24 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
24 25
25 26 public interface TbClusterService {
... ... @@ -32,6 +33,8 @@ public interface TbClusterService {
32 33
33 34 void onToRuleEngineMsg(String targetServiceId, FromDeviceRpcResponse response);
34 35
  36 + void onToTransportMsg(String targetServiceId, ToTransportMsg response);
  37 +
35 38 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
36 39
37 40 }
... ...
... ... @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
27 27 import org.thingsboard.server.common.msg.queue.ServiceType;
28 28 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
29 29 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  30 +import org.thingsboard.server.gen.transport.TransportProtos;
30 31 import org.thingsboard.server.queue.discovery.PartitionService;
31 32 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
32 33 import org.thingsboard.server.queue.util.TbRuleEngineComponent;
... ... @@ -52,6 +53,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
52 53 private final TbClusterService clusterService;
53 54 private final TbServiceInfoProvider serviceInfoProvider;
54 55
  56 +
55 57 private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> toDeviceRpcRequests = new ConcurrentHashMap<>();
56 58
57 59 private Optional<TbCoreDeviceRpcService> tbCoreRpcService;
... ... @@ -85,8 +87,16 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
85 87 }
86 88
87 89 @Override
88   - public void sendRpcReplyToDevice(DeviceId deviceId, int requestId, String body) {
89   -// TODO 2.5
  90 + public void sendRpcReplyToDevice(String serviceId, UUID sessionId, int requestId, String body) {
  91 + TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder()
  92 + .setRequestId(requestId)
  93 + .setPayload(body).build();
  94 + TransportProtos.ToTransportMsg msg = TransportProtos.ToTransportMsg.newBuilder()
  95 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  96 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  97 + .setToServerResponse(responseMsg)
  98 + .build();
  99 + clusterService.onToTransportMsg(serviceId, msg);
90 100 }
91 101
92 102 @Override
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.service.rpc;
17   -
18   -import lombok.Getter;
19   -import lombok.RequiredArgsConstructor;
20   -import lombok.ToString;
21   -import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
22   -import org.thingsboard.server.common.data.id.DeviceId;
23   -import org.thingsboard.server.common.data.id.TenantId;
24   -import org.thingsboard.server.common.msg.MsgType;
25   -import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
26   -
27   -/**
28   - * Created by ashvayka on 16.04.18.
29   - */
30   -@ToString
31   -@RequiredArgsConstructor
32   -public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
33   -
34   - @Getter
35   - private final TenantId tenantId;
36   -
37   - @Getter
38   - private final DeviceId deviceId;
39   -
40   - @Getter
41   - private final ToServerRpcResponseMsg msg;
42   -
43   - @Override
44   - public MsgType getMsgType() {
45   - return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
46   - }
47   -}
... ... @@ -18,13 +18,14 @@ package org.thingsboard.server.service.transport;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.stereotype.Service;
  21 +import org.thingsboard.server.common.msg.queue.ServiceType;
  22 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
21 24 import org.thingsboard.server.queue.TbQueueCallback;
22 25 import org.thingsboard.server.queue.TbQueueMsgMetadata;
23 26 import org.thingsboard.server.queue.TbQueueProducer;
24 27 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
25   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
26   -import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
27   -import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  28 +import org.thingsboard.server.queue.discovery.PartitionService;
28 29 import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
29 30 import org.thingsboard.server.queue.util.TbCoreComponent;
30 31
... ... @@ -38,28 +39,26 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
38 39 @TbCoreComponent
39 40 public class DefaultTbCoreToTransportService implements TbCoreToTransportService {
40 41
  42 + private final PartitionService partitionService;
41 43 private final TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> tbTransportProducer;
42 44
43   - @Value("${queue.transport.notifications_topic}")
44   - private String notificationsTopic;
45   -
46   - public DefaultTbCoreToTransportService(TbQueueProducerProvider tbQueueProducerProvider) {
  45 + public DefaultTbCoreToTransportService(PartitionService partitionService, TbQueueProducerProvider tbQueueProducerProvider) {
  46 + this.partitionService = partitionService;
47 47 this.tbTransportProducer = tbQueueProducerProvider.getTransportNotificationsMsgProducer();
48 48 }
49 49
50 50 @Override
51   - public void process(String nodeId, DeviceActorToTransportMsg msg) {
  51 + public void process(String nodeId, ToTransportMsg msg) {
52 52 process(nodeId, msg, null, null);
53 53 }
54 54
55 55 @Override
56   - public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
57   - String topic = notificationsTopic + "." + nodeId;
  56 + public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
  57 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
58 58 UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB());
59   - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build();
60   - log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg);
61   - TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, transportMsg);
62   - tbTransportProducer.send(TopicPartitionInfo.builder().topic(topic).build(), queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
  59 + log.trace("[{}][{}] Pushing session data to topic: {}", tpi.getFullTopicName(), sessionId, msg);
  60 + TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg);
  61 + tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
63 62 }
64 63
65 64 private static class QueueCallbackAdaptor implements TbQueueCallback {
... ...
... ... @@ -15,14 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.service.transport;
17 17
18   -import org.thingsboard.server.gen.transport.TransportProtos;
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
19 19
20 20 import java.util.function.Consumer;
21 21
22 22 public interface TbCoreToTransportService {
23 23
24   - void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg);
  24 + void process(String nodeId, ToTransportMsg msg);
25 25
26   - void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure);
  26 + void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure);
27 27
28 28 }
... ...
... ... @@ -251,8 +251,6 @@ actors:
251 251 enabled: "${ACTORS_QUEUE_ENABLED:true}"
252 252 # Maximum allowed timeout for persistence into the queue
253 253 timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}"
254   - client_side_rpc:
255   - timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
256 254
257 255 cache:
258 256 # caffeine or redis
... ... @@ -458,6 +456,8 @@ transport:
458 456 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
459 457 # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check)
460 458 max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
  459 + client_side_rpc:
  460 + timeout: "${CLIENT_SIDE_RPC_TIMEOUT:60000}"
461 461 # Local HTTP transport parameters
462 462 http:
463 463 enabled: "${HTTP_ENABLED:true}"
... ... @@ -552,15 +552,15 @@ queue:
552 552 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
553 553 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
554 554 transport_api:
555   - requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
556   - responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
  555 + requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
  556 + responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
557 557 max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
558 558 max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
559 559 max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
560 560 request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
561 561 response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
562 562 core:
563   - topic: "${TB_QUEUE_CORE_TOPIC:tb.core}"
  563 + topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
564 564 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
565 565 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
566 566 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -569,9 +569,9 @@ queue:
569 569 print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
570 570 js:
571 571 # JS Eval request topic
572   - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
  572 + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
573 573 # JS Eval responses topic prefix that is combined with node id
574   - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
  574 + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}"
575 575 # JS Eval max pending requests
576 576 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
577 577 # JS Eval max request timeout
... ... @@ -581,7 +581,7 @@ queue:
581 581 # JS response auto commit interval
582 582 response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
583 583 rule-engine:
584   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
  584 + topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
585 585 poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
586 586 pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
587 587 stats:
... ... @@ -589,7 +589,7 @@ queue:
589 589 print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
590 590 queues:
591 591 - name: "Main"
592   - topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
  592 + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
593 593 poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
594 594 partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
595 595 pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -604,7 +604,7 @@ queue:
604 604 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
605 605 pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
606 606 - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
607   - topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
  607 + topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
608 608 poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
609 609 partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
610 610 pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -620,7 +620,7 @@ queue:
620 620 pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
621 621 transport:
622 622 # For high priority notifications that require minimum latency and processing time
623   - notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  623 + notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
624 624 poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
625 625
626 626 service:
... ...
... ... @@ -89,8 +89,6 @@ public enum MsgType {
89 89
90 90 DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG,
91 91
92   - DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG,
93   -
94 92 /**
95 93 * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
96 94 */
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.msg.timeout;
17   -
18   -import org.thingsboard.server.common.msg.MsgType;
19   -
20   -/**
21   - * @author Andrew Shvayka
22   - */
23   -public final class DeviceActorClientSideRpcTimeoutMsg extends TimeoutMsg<Integer> {
24   -
25   - public DeviceActorClientSideRpcTimeoutMsg(Integer id, long timeout) {
26   - super(id, timeout);
27   - }
28   -
29   - @Override
30   - public MsgType getMsgType() {
31   - return MsgType.DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG;
32   - }
33   -}
... ... @@ -196,11 +196,6 @@ public class ConsistentHashPartitionService implements PartitionService {
196 196 }
197 197 }
198 198
199   - @Override
200   - public Set<TenantId> getIsolatedTenants(ServiceType serviceType) {
201   - throw new RuntimeException("Not Implemented!");
202   - }
203   -
204 199 private Map<ServiceQueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
205 200 final Map<ServiceQueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
206 201 services.forEach(serviceInfo -> {
... ...
... ... @@ -47,8 +47,6 @@ public interface PartitionService {
47 47 */
48 48 Set<String> getAllServiceIds(ServiceType serviceType);
49 49
50   - Set<TenantId> getIsolatedTenants(ServiceType serviceType);
51   -
52 50 /**
53 51 * Each Service should start a consumer for messages that target individual service instance based on serviceId.
54 52 * This topic is likely to have single partition, and is always assigned to the service.
... ...
... ... @@ -79,7 +79,7 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
79 79 if (callback != null) {
80 80 callback.onFailure(exception);
81 81 } else {
82   - log.warn("Producer template failure", exception);
  82 + log.warn("Producer template failure: {}", exception.getMessage(), exception);
83 83 }
84 84 }
85 85 });
... ...
... ... @@ -245,16 +245,6 @@ message TransportToRuleEngineMsg {
245 245 ToServerRpcRequestMsg toServerRPCCallRequest = 5;
246 246 }
247 247
248   -message DeviceActorToTransportMsg {
249   - int64 sessionIdMSB = 1;
250   - int64 sessionIdLSB = 2;
251   - SessionCloseNotificationProto sessionCloseNotification = 3;
252   - GetAttributeResponseMsg getAttributesResponse = 4;
253   - AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
254   - ToDeviceRpcRequestMsg toDeviceRequest = 6;
255   - ToServerRpcResponseMsg toServerResponse = 7;
256   -}
257   -
258 248 /**
259 249 * TB Core Data Structures
260 250 */
... ... @@ -410,5 +400,11 @@ message ToRuleEngineNotificationMsg {
410 400
411 401 /* Messages that are handled by ThingsBoard Transport Service */
412 402 message ToTransportMsg {
413   - DeviceActorToTransportMsg toDeviceSessionMsg = 1;
  403 + int64 sessionIdMSB = 1;
  404 + int64 sessionIdLSB = 2;
  405 + SessionCloseNotificationProto sessionCloseNotification = 3;
  406 + GetAttributeResponseMsg getAttributesResponse = 4;
  407 + AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
  408 + ToDeviceRpcRequestMsg toDeviceRequest = 6;
  409 + ToServerRpcResponseMsg toServerResponse = 7;
414 410 }
... ...
... ... @@ -303,6 +303,8 @@ public class CoapTransportResource extends CoapResource {
303 303 .setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
304 304 .setSessionIdMSB(sessionId.getMostSignificantBits())
305 305 .setSessionIdLSB(sessionId.getLeastSignificantBits())
  306 + .setDeviceName(msg.getDeviceInfo().getDeviceName())
  307 + .setDeviceType(msg.getDeviceInfo().getDeviceType())
306 308 .build();
307 309 onSuccess.accept(sessionInfo);
308 310 } else {
... ...
... ... @@ -221,6 +221,8 @@ public class DeviceApiController {
221 221 .setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
222 222 .setSessionIdMSB(sessionId.getMostSignificantBits())
223 223 .setSessionIdLSB(sessionId.getLeastSignificantBits())
  224 + .setDeviceName(msg.getDeviceInfo().getDeviceName())
  225 + .setDeviceType(msg.getDeviceInfo().getDeviceType())
224 226 .build();
225 227 onSuccess.accept(sessionInfo);
226 228 } else {
... ...
... ... @@ -507,6 +507,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
507 507 .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
508 508 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
509 509 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
  510 + .setDeviceName(msg.getDeviceInfo().getDeviceName())
  511 + .setDeviceType(msg.getDeviceInfo().getDeviceType())
510 512 .build();
511 513 transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
512 514 transportService.registerAsyncSession(sessionInfo, this);
... ...
... ... @@ -44,6 +44,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
44 44 .setDeviceIdLSB(deviceInfo.getDeviceIdLSB())
45 45 .setTenantIdMSB(deviceInfo.getTenantIdMSB())
46 46 .setTenantIdLSB(deviceInfo.getTenantIdLSB())
  47 + .setDeviceName(deviceInfo.getDeviceName())
  48 + .setDeviceType(deviceInfo.getDeviceType())
47 49 .build();
48 50 setDeviceInfo(deviceInfo);
49 51 }
... ...
... ... @@ -18,36 +18,25 @@ package org.thingsboard.server.common.transport.service;
18 18 import com.google.gson.Gson;
19 19 import com.google.gson.JsonObject;
20 20 import lombok.extern.slf4j.Slf4j;
21   -import org.springframework.beans.factory.annotation.Autowired;
22 21 import org.springframework.beans.factory.annotation.Value;
23 22 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
24 23 import org.springframework.stereotype.Service;
25 24 import org.thingsboard.common.util.ThingsBoardThreadFactory;
26   -import org.thingsboard.server.common.msg.TbMsg;
27   -import org.thingsboard.server.common.msg.TbMsgMetaData;
28   -import org.thingsboard.server.common.msg.session.SessionMsgType;
29   -import org.thingsboard.server.common.transport.util.JsonUtils;
30   -import org.thingsboard.server.queue.TbQueueCallback;
31   -import org.thingsboard.server.queue.TbQueueConsumer;
32   -import org.thingsboard.server.queue.TbQueueMsgMetadata;
33   -import org.thingsboard.server.queue.TbQueueProducer;
34   -import org.thingsboard.server.queue.TbQueueRequestTemplate;
35   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
36 25 import org.thingsboard.server.common.data.EntityType;
37 26 import org.thingsboard.server.common.data.id.DeviceId;
38 27 import org.thingsboard.server.common.data.id.TenantId;
  28 +import org.thingsboard.server.common.msg.TbMsg;
  29 +import org.thingsboard.server.common.msg.TbMsgDataType;
  30 +import org.thingsboard.server.common.msg.TbMsgMetaData;
  31 +import org.thingsboard.server.common.msg.queue.ServiceType;
  32 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  33 +import org.thingsboard.server.common.msg.session.SessionMsgType;
39 34 import org.thingsboard.server.common.msg.tools.TbRateLimits;
40 35 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
41 36 import org.thingsboard.server.common.transport.SessionMsgListener;
42 37 import org.thingsboard.server.common.transport.TransportService;
43 38 import org.thingsboard.server.common.transport.TransportServiceCallback;
44   -import org.thingsboard.server.queue.discovery.PartitionService;
45   -import org.thingsboard.server.common.msg.queue.ServiceType;
46   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
47   -import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
48   -import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
49   -import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
50   -import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
  39 +import org.thingsboard.server.common.transport.util.JsonUtils;
51 40 import org.thingsboard.server.gen.transport.TransportProtos;
52 41 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
53 42 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
... ... @@ -55,11 +44,23 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
55 44 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
56 45 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
57 46 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
  47 +import org.thingsboard.server.queue.TbQueueCallback;
  48 +import org.thingsboard.server.queue.TbQueueConsumer;
  49 +import org.thingsboard.server.queue.TbQueueMsgMetadata;
  50 +import org.thingsboard.server.queue.TbQueueProducer;
  51 +import org.thingsboard.server.queue.TbQueueRequestTemplate;
58 52 import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
  53 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  54 +import org.thingsboard.server.queue.discovery.PartitionService;
  55 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  56 +import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
  57 +import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
59 58
60 59 import javax.annotation.PostConstruct;
61 60 import javax.annotation.PreDestroy;
  61 +import java.util.Collections;
62 62 import java.util.List;
  63 +import java.util.Map;
63 64 import java.util.Random;
64 65 import java.util.UUID;
65 66 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -90,6 +91,8 @@ public class DefaultTransportService implements TransportService {
90 91 private long sessionInactivityTimeout;
91 92 @Value("${transport.sessions.report_timeout}")
92 93 private long sessionReportTimeout;
  94 + @Value("${transport.client_side_rpc.timeout:60000}")
  95 + private long clientSideRpcTimeout;
93 96 @Value("${queue.transport.poll_interval}")
94 97 private int notificationsPollDuration;
95 98
... ... @@ -97,6 +100,7 @@ public class DefaultTransportService implements TransportService {
97 100 private final TbTransportQueueFactory queueProvider;
98 101 private final TbQueueProducerProvider producerProvider;
99 102 private final PartitionService partitionService;
  103 + private final TbServiceInfoProvider serviceInfoProvider;
100 104
101 105 protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
102 106 protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
... ... @@ -106,15 +110,17 @@ public class DefaultTransportService implements TransportService {
106 110 protected ScheduledExecutorService schedulerExecutor;
107 111 protected ExecutorService transportCallbackExecutor;
108 112
109   - private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
  113 + private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
  114 + private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
110 115 //TODO: Implement cleanup of this maps.
111   - private ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
112   - private ConcurrentMap<DeviceId, TbRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
  116 + private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
  117 + private final ConcurrentMap<DeviceId, TbRateLimits> perDeviceLimits = new ConcurrentHashMap<>();
113 118
114 119 private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
115 120 private volatile boolean stopped = false;
116 121
117   - public DefaultTransportService(TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) {
  122 + public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) {
  123 + this.serviceInfoProvider = serviceInfoProvider;
118 124 this.queueProvider = queueProvider;
119 125 this.producerProvider = producerProvider;
120 126 this.partitionService = partitionService;
... ... @@ -134,7 +140,8 @@ public class DefaultTransportService implements TransportService {
134 140 ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
135 141 tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
136 142 transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
137   - transportNotificationsConsumer.subscribe();
  143 + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
  144 + transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
138 145 transportApiRequestTemplate.init();
139 146 mainConsumerExecutor.execute(() -> {
140 147 while (!stopped) {
... ... @@ -145,10 +152,7 @@ public class DefaultTransportService implements TransportService {
145 152 }
146 153 records.forEach(record -> {
147 154 try {
148   - ToTransportMsg toTransportMsg = record.getValue();
149   - if (toTransportMsg.hasToDeviceSessionMsg()) {
150   - processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
151   - }
  155 + processToTransportMsg(record.getValue());
152 156 } catch (Throwable e) {
153 157 log.warn("Failed to process the notification.", e);
154 158 }
... ... @@ -195,7 +199,7 @@ public class DefaultTransportService implements TransportService {
195 199
196 200 @Override
197 201 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
198   - sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
  202 + sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
199 203 }
200 204
201 205 @Override
... ... @@ -210,22 +214,6 @@ public class DefaultTransportService implements TransportService {
210 214 }
211 215 }
212 216
213   - public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
214   -
215   - TransportProtos.GetTenantRoutingInfoRequestMsg msg = TransportProtos.GetTenantRoutingInfoRequestMsg.newBuilder()
216   - .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
217   - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
218   - .build();
219   - TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build());
220   - try {
221   - TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
222   - TransportProtos.GetTenantRoutingInfoResponseMsg routingInfo = response.getValue().getGetTenantRoutingInfoResponseMsg();
223   - return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine());
224   - } catch (InterruptedException | ExecutionException e) {
225   - throw new RuntimeException(e);
226   - }
227   - }
228   -
229 217 @Override
230 218 public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
231 219 log.trace("Processing msg: {}", msg);
... ... @@ -253,7 +241,7 @@ public class DefaultTransportService implements TransportService {
253 241 @Override
254 242 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
255 243 if (log.isTraceEnabled()) {
256   - log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
  244 + log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
257 245 }
258 246 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
259 247 .setSubscriptionInfo(msg).build(), callback);
... ... @@ -340,13 +328,51 @@ public class DefaultTransportService implements TransportService {
340 328 }
341 329 }
342 330
343   - //TODO 2.5: Need to handle timeouts on the transport level and not on the Device Actor Level.
  331 + private void processTimeout(String requestId) {
  332 + RpcRequestMetadata data = toServerRpcPendingMap.remove(requestId);
  333 + if (data != null) {
  334 + SessionMetaData md = sessions.get(data.getSessionId());
  335 + if (md != null) {
  336 + SessionMsgListener listener = md.getListener();
  337 + transportCallbackExecutor.submit(() -> {
  338 + TransportProtos.ToServerRpcResponseMsg responseMsg =
  339 + TransportProtos.ToServerRpcResponseMsg.newBuilder()
  340 + .setRequestId(data.getRequestId())
  341 + .setError("timeout").build();
  342 + listener.onToServerRpcResponse(responseMsg);
  343 + });
  344 + if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
  345 + deregisterSession(md.getSessionInfo());
  346 + }
  347 + } else {
  348 + log.debug("[{}] Missing session.", data.getSessionId());
  349 + }
  350 + }
  351 + }
  352 +
344 353 @Override
345 354 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
346 355 if (checkLimits(sessionInfo, msg, callback)) {
347 356 reportActivityInternal(sessionInfo);
348   -// sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo).
349   -// setToServerRPCCallRequest(msg).build(), new TransportTbQueueCallback(callback));
  357 + UUID sessionId = toSessionId(sessionInfo);
  358 + TenantId tenantId = getTenantId(sessionInfo);
  359 + DeviceId deviceId = getDeviceId(sessionInfo);
  360 + JsonObject json = new JsonObject();
  361 + json.addProperty("method", msg.getMethodName());
  362 + json.add("params", JsonUtils.parse(msg.getParams()));
  363 +
  364 + TbMsgMetaData metaData = new TbMsgMetaData();
  365 + metaData.putValue("deviceName", sessionInfo.getDeviceName());
  366 + metaData.putValue("deviceType", sessionInfo.getDeviceType());
  367 + metaData.putValue("requestId", Integer.toString(msg.getRequestId()));
  368 + metaData.putValue("serviceId", serviceInfoProvider.getServiceId());
  369 + metaData.putValue("sessionId", sessionId.toString());
  370 + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.TO_SERVER_RPC_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json));
  371 + sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
  372 +
  373 + String requestId = sessionId + "-" + msg.getRequestId();
  374 + toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId()));
  375 + schedulerExecutor.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS);
350 376 }
351 377 }
352 378
... ... @@ -364,7 +390,7 @@ public class DefaultTransportService implements TransportService {
364 390 }
365 391
366 392 private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
367   - UUID sessionId = toId(sessionInfo);
  393 + UUID sessionId = toSessionId(sessionInfo);
368 394 SessionMetaData sessionMetaData = sessions.get(sessionId);
369 395 if (sessionMetaData != null) {
370 396 sessionMetaData.updateLastActivityTime();
... ... @@ -377,7 +403,7 @@ public class DefaultTransportService implements TransportService {
377 403 sessions.forEach((uuid, sessionMD) -> {
378 404 if (sessionMD.getLastActivityTime() < expTime) {
379 405 if (log.isDebugEnabled()) {
380   - log.debug("[{}] Session has expired due to last activity time: {}", toId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
  406 + log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
381 407 }
382 408 process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
383 409 sessions.remove(uuid);
... ... @@ -407,7 +433,7 @@ public class DefaultTransportService implements TransportService {
407 433 @Override
408 434 public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
409 435 SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
410   - sessions.putIfAbsent(toId(sessionInfo), currentSession);
  436 + sessions.putIfAbsent(toSessionId(sessionInfo), currentSession);
411 437
412 438 ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> {
413 439 listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
... ... @@ -419,18 +445,18 @@ public class DefaultTransportService implements TransportService {
419 445
420 446 @Override
421 447 public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
422   - SessionMetaData currentSession = sessions.get(toId(sessionInfo));
  448 + SessionMetaData currentSession = sessions.get(toSessionId(sessionInfo));
423 449 if (currentSession != null && currentSession.hasScheduledFuture()) {
424 450 log.debug("Stopping scheduler to avoid resending response if request has been ack.");
425 451 currentSession.getScheduledFuture().cancel(false);
426 452 }
427   - sessions.remove(toId(sessionInfo));
  453 + sessions.remove(toSessionId(sessionInfo));
428 454 }
429 455
430 456 @Override
431 457 public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
432 458 if (log.isTraceEnabled()) {
433   - log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
  459 + log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg);
434 460 }
435 461 if (!rateLimitEnabled) {
436 462 return true;
... ... @@ -442,7 +468,7 @@ public class DefaultTransportService implements TransportService {
442 468 callback.onError(new TbRateLimitsException(EntityType.TENANT));
443 469 }
444 470 if (log.isTraceEnabled()) {
445   - log.trace("[{}][{}] Tenant level rate limit detected: {}", toId(sessionInfo), tenantId, msg);
  471 + log.trace("[{}][{}] Tenant level rate limit detected: {}", toSessionId(sessionInfo), tenantId, msg);
446 472 }
447 473 return false;
448 474 }
... ... @@ -453,7 +479,7 @@ public class DefaultTransportService implements TransportService {
453 479 callback.onError(new TbRateLimitsException(EntityType.DEVICE));
454 480 }
455 481 if (log.isTraceEnabled()) {
456   - log.trace("[{}][{}] Device level rate limit detected: {}", toId(sessionInfo), deviceId, msg);
  482 + log.trace("[{}][{}] Device level rate limit detected: {}", toSessionId(sessionInfo), deviceId, msg);
457 483 }
458 484 return false;
459 485 }
... ... @@ -461,7 +487,7 @@ public class DefaultTransportService implements TransportService {
461 487 return true;
462 488 }
463 489
464   - protected void processToTransportMsg(TransportProtos.DeviceActorToTransportMsg toSessionMsg) {
  490 + protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
465 491 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
466 492 SessionMetaData md = sessions.get(sessionId);
467 493 if (md != null) {
... ... @@ -480,6 +506,8 @@ public class DefaultTransportService implements TransportService {
480 506 listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
481 507 }
482 508 if (toSessionMsg.hasToServerResponse()) {
  509 + String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();
  510 + toServerRpcPendingMap.remove(requestId);
483 511 listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
484 512 }
485 513 });
... ... @@ -492,7 +520,7 @@ public class DefaultTransportService implements TransportService {
492 520 }
493 521 }
494 522
495   - protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
  523 + protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {
496 524 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
497 525 }
498 526
... ...
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RpcRequestMetadata.java renamed from common/message/src/main/java/org/thingsboard/server/common/msg/core/ToServerRpcResponseMsg.java
... ... @@ -13,20 +13,14 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.common.msg.core;
  16 +package org.thingsboard.server.common.transport.service;
17 17
18 18 import lombok.Data;
19 19
20   -/**
21   - * @author Andrew Shvayka
22   - */
23   -@Data
24   -public class ToServerRpcResponseMsg {
  20 +import java.util.UUID;
25 21
  22 +@Data
  23 +public class RpcRequestMetadata {
  24 + private final UUID sessionId;
26 25 private final int requestId;
27   - private final String data;
28   -
29   - public boolean isSuccess() {
30   - return true;
31   - }
32 26 }
... ...
... ... @@ -16,6 +16,8 @@
16 16 package org.thingsboard.rule.engine.api;
17 17
18 18 import org.thingsboard.server.common.data.id.DeviceId;
  19 +
  20 +import java.util.UUID;
19 21 import java.util.function.Consumer;
20 22
21 23 /**
... ... @@ -23,7 +25,7 @@ import java.util.function.Consumer;
23 25 */
24 26 public interface RuleEngineRpcService {
25 27
26   - void sendRpcReplyToDevice(DeviceId deviceId, int requestId, String body);
  28 + void sendRpcReplyToDevice(String serviceId, UUID sessionId, int requestId, String body);
27 29
28 30 void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest request, Consumer<RuleEngineDeviceRpcResponse> consumer);
29 31
... ...
... ... @@ -28,6 +28,8 @@ import org.thingsboard.server.common.data.id.DeviceId;
28 28 import org.thingsboard.server.common.data.plugin.ComponentType;
29 29 import org.thingsboard.server.common.msg.TbMsg;
30 30
  31 +import java.util.UUID;
  32 +
31 33 @Slf4j
32 34 @RuleNode(
33 35 type = ComponentType.ACTION,
... ... @@ -50,15 +52,21 @@ public class TbSendRPCReplyNode implements TbNode {
50 52
51 53 @Override
52 54 public void onMsg(TbContext ctx, TbMsg msg) {
  55 + String serviceIdStr = msg.getMetaData().getValue(config.getServiceIdMetaDataAttribute());
  56 + String sessionIdStr = msg.getMetaData().getValue(config.getSessionIdMetaDataAttribute());
53 57 String requestIdStr = msg.getMetaData().getValue(config.getRequestIdMetaDataAttribute());
54 58 if (msg.getOriginator().getEntityType() != EntityType.DEVICE) {
55 59 ctx.tellFailure(msg, new RuntimeException("Message originator is not a device entity!"));
56 60 } else if (StringUtils.isEmpty(requestIdStr)) {
57 61 ctx.tellFailure(msg, new RuntimeException("Request id is not present in the metadata!"));
  62 + } else if (StringUtils.isEmpty(serviceIdStr)) {
  63 + ctx.tellFailure(msg, new RuntimeException("Service id is not present in the metadata!"));
  64 + } else if (StringUtils.isEmpty(sessionIdStr)) {
  65 + ctx.tellFailure(msg, new RuntimeException("Session id is not present in the metadata!"));
58 66 } else if (StringUtils.isEmpty(msg.getData())) {
59 67 ctx.tellFailure(msg, new RuntimeException("Request body is empty!"));
60 68 } else {
61   - ctx.getRpcService().sendRpcReplyToDevice(new DeviceId(msg.getOriginator().getId()), Integer.parseInt(requestIdStr), msg.getData());
  69 + ctx.getRpcService().sendRpcReplyToDevice(serviceIdStr, UUID.fromString(sessionIdStr), Integer.parseInt(requestIdStr), msg.getData());
62 70 ctx.tellSuccess(msg);
63 71 }
64 72 }
... ...
... ... @@ -16,18 +16,40 @@
16 16 package org.thingsboard.rule.engine.rpc;
17 17
18 18 import lombok.Data;
  19 +import org.springframework.util.StringUtils;
19 20 import org.thingsboard.rule.engine.api.NodeConfiguration;
20 21 import org.thingsboard.server.common.data.DataConstants;
21 22
22 23 @Data
23 24 public class TbSendRpcReplyNodeConfiguration implements NodeConfiguration<TbSendRpcReplyNodeConfiguration> {
24 25
  26 + public static final String SERVICE_ID = "serviceId";
  27 + public static final String SESSION_ID = "sessionId";
  28 + public static final String REQUEST_ID = "requestId";
  29 +
  30 + private String serviceIdMetaDataAttribute;
  31 + private String sessionIdMetaDataAttribute;
25 32 private String requestIdMetaDataAttribute;
26 33
27 34 @Override
28 35 public TbSendRpcReplyNodeConfiguration defaultConfiguration() {
29 36 TbSendRpcReplyNodeConfiguration configuration = new TbSendRpcReplyNodeConfiguration();
30   - configuration.setRequestIdMetaDataAttribute("requestId");
  37 + configuration.setServiceIdMetaDataAttribute(SERVICE_ID);
  38 + configuration.setSessionIdMetaDataAttribute(SESSION_ID);
  39 + configuration.setRequestIdMetaDataAttribute(REQUEST_ID);
31 40 return configuration;
32 41 }
  42 +
  43 + public String getServiceIdMetaDataAttribute() {
  44 + return !StringUtils.isEmpty(serviceIdMetaDataAttribute) ? serviceIdMetaDataAttribute : SERVICE_ID;
  45 + }
  46 +
  47 + public String getSessionIdMetaDataAttribute() {
  48 + return !StringUtils.isEmpty(sessionIdMetaDataAttribute) ? sessionIdMetaDataAttribute : SESSION_ID;
  49 + }
  50 +
  51 + public String getRequestIdMetaDataAttribute() {
  52 + return !StringUtils.isEmpty(requestIdMetaDataAttribute) ? requestIdMetaDataAttribute : REQUEST_ID;
  53 + }
33 54 }
  55 +
... ...
... ... @@ -81,15 +81,15 @@ queue:
81 81 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
82 82 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
83 83 transport_api:
84   - requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
85   - responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
  84 + requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
  85 + responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
86 86 max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
87 87 max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
88 88 max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
89 89 request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
90 90 response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
91 91 core:
92   - topic: "${TB_QUEUE_CORE_TOPIC:tb.core}"
  92 + topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
93 93 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
94 94 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
95 95 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -98,9 +98,9 @@ queue:
98 98 print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
99 99 js:
100 100 # JS Eval request topic
101   - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
  101 + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
102 102 # JS Eval responses topic prefix that is combined with node id
103   - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
  103 + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}"
104 104 # JS Eval max pending requests
105 105 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
106 106 # JS Eval max request timeout
... ... @@ -110,7 +110,7 @@ queue:
110 110 # JS response auto commit interval
111 111 response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
112 112 rule-engine:
113   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
  113 + topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
114 114 poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
115 115 pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
116 116 stats:
... ... @@ -118,7 +118,7 @@ queue:
118 118 print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
119 119 queues:
120 120 - name: "Main"
121   - topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
  121 + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
122 122 poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
123 123 partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
124 124 pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -133,7 +133,7 @@ queue:
133 133 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
134 134 pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
135 135 - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
136   - topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
  136 + topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
137 137 poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
138 138 partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
139 139 pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -149,7 +149,7 @@ queue:
149 149 pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
150 150 transport:
151 151 # For high priority notifications that require minimum latency and processing time
152   - notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  152 + notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
153 153 poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
154 154
155 155 service:
... ...
... ... @@ -82,15 +82,15 @@ queue:
82 82 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
83 83 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
84 84 transport_api:
85   - requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
86   - responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
  85 + requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
  86 + responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
87 87 max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
88 88 max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
89 89 max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
90 90 request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
91 91 response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
92 92 core:
93   - topic: "${TB_QUEUE_CORE_TOPIC:tb.core}"
  93 + topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
94 94 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
95 95 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
96 96 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -99,9 +99,9 @@ queue:
99 99 print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
100 100 js:
101 101 # JS Eval request topic
102   - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
  102 + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
103 103 # JS Eval responses topic prefix that is combined with node id
104   - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
  104 + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}"
105 105 # JS Eval max pending requests
106 106 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
107 107 # JS Eval max request timeout
... ... @@ -111,7 +111,7 @@ queue:
111 111 # JS response auto commit interval
112 112 response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
113 113 rule-engine:
114   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
  114 + topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
115 115 poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
116 116 pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
117 117 stats:
... ... @@ -119,7 +119,7 @@ queue:
119 119 print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
120 120 queues:
121 121 - name: "Main"
122   - topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
  122 + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
123 123 poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
124 124 partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
125 125 pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -134,7 +134,7 @@ queue:
134 134 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
135 135 pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
136 136 - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
137   - topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
  137 + topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
138 138 poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
139 139 partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
140 140 pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -150,7 +150,7 @@ queue:
150 150 pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
151 151 transport:
152 152 # For high priority notifications that require minimum latency and processing time
153   - notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  153 + notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
154 154 poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
155 155
156 156 service:
... ...
... ... @@ -17,10 +17,20 @@
17 17 spring.main.web-environment: false
18 18 spring.main.web-application-type: none
19 19
20   -# Clustering properties
21   -cluster:
22   - # Unique id for this node (autogenerated if empty)
23   - node_id: "${CLUSTER_NODE_ID:}"
  20 +# Zookeeper connection parameters. Used for service discovery.
  21 +zk:
  22 + # Enable/disable zookeeper discovery service.
  23 + enabled: "${ZOOKEEPER_ENABLED:false}"
  24 + # Zookeeper connect string
  25 + url: "${ZOOKEEPER_URL:localhost:2181}"
  26 + # Zookeeper retry interval in milliseconds
  27 + retry_interval_ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}"
  28 + # Zookeeper connection timeout in milliseconds
  29 + connection_timeout_ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}"
  30 + # Zookeeper session timeout in milliseconds
  31 + session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
  32 + # Name of the directory in zookeeper 'filesystem'
  33 + zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
24 34
25 35 # MQTT server parameters
26 36 transport:
... ... @@ -102,15 +112,15 @@ queue:
102 112 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
103 113 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
104 114 transport_api:
105   - requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
106   - responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
  115 + requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
  116 + responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
107 117 max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
108 118 max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
109 119 max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
110 120 request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
111 121 response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
112 122 core:
113   - topic: "${TB_QUEUE_CORE_TOPIC:tb.core}"
  123 + topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
114 124 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
115 125 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
116 126 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -119,9 +129,9 @@ queue:
119 129 print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
120 130 js:
121 131 # JS Eval request topic
122   - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
  132 + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
123 133 # JS Eval responses topic prefix that is combined with node id
124   - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
  134 + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}"
125 135 # JS Eval max pending requests
126 136 max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
127 137 # JS Eval max request timeout
... ... @@ -131,7 +141,7 @@ queue:
131 141 # JS response auto commit interval
132 142 response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
133 143 rule-engine:
134   - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
  144 + topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
135 145 poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
136 146 pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
137 147 stats:
... ... @@ -139,7 +149,7 @@ queue:
139 149 print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
140 150 queues:
141 151 - name: "Main"
142   - topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
  152 + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
143 153 poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
144 154 partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
145 155 pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -154,7 +164,7 @@ queue:
154 164 failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
155 165 pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
156 166 - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
157   - topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
  167 + topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
158 168 poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
159 169 partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
160 170 pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
... ... @@ -170,7 +180,7 @@ queue:
170 180 pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
171 181 transport:
172 182 # For high priority notifications that require minimum latency and processing time
173   - notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  183 + notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
174 184 poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
175 185
176 186 service:
... ...