Commit 962755735b55cef48e8148664014487bde74b02b

Authored by Andrii Shvaika
1 parent c1b5fd5c

Fix for Server-side RPC in cluster mode

@@ -23,6 +23,7 @@ import org.springframework.stereotype.Service; @@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
23 import org.thingsboard.rule.engine.api.RpcError; 23 import org.thingsboard.rule.engine.api.RpcError;
24 import org.thingsboard.server.actors.ActorSystemContext; 24 import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.common.data.id.TenantId; 25 import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.msg.MsgType;
26 import org.thingsboard.server.common.msg.TbActorMsg; 27 import org.thingsboard.server.common.msg.TbActorMsg;
27 import org.thingsboard.server.common.msg.queue.ServiceType; 28 import org.thingsboard.server.common.msg.queue.ServiceType;
28 import org.thingsboard.server.common.msg.queue.TbCallback; 29 import org.thingsboard.server.common.msg.queue.TbCallback;
@@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService; @@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
45 import org.thingsboard.server.service.queue.processing.AbstractConsumerService; 46 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
46 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 47 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
47 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; 48 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  49 +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
48 import org.thingsboard.server.service.state.DeviceStateService; 50 import org.thingsboard.server.service.state.DeviceStateService;
49 import org.thingsboard.server.service.subscription.SubscriptionManagerService; 51 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
50 import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; 52 import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
@@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
100 } 102 }
101 103
102 @PreDestroy 104 @PreDestroy
103 - public void destroy(){ 105 + public void destroy() {
104 super.destroy(); 106 super.destroy();
105 } 107 }
106 108
@@ -143,8 +145,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -143,8 +145,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
143 } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) { 145 } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
144 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); 146 Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
145 if (actorMsg.isPresent()) { 147 if (actorMsg.isPresent()) {
146 - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());  
147 - actorContext.tell(actorMsg.get(), ActorRef.noSender()); 148 + TbActorMsg tbActorMsg = actorMsg.get();
  149 + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
  150 + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
  151 + } else {
  152 + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
  153 + actorContext.tell(actorMsg.get(), ActorRef.noSender());
  154 + }
148 } 155 }
149 callback.onSuccess(); 156 callback.onSuccess();
150 } 157 }
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Value; 21 import org.springframework.beans.factory.annotation.Value;
22 import org.springframework.scheduling.annotation.Scheduled; 22 import org.springframework.scheduling.annotation.Scheduled;
23 import org.springframework.stereotype.Service; 23 import org.springframework.stereotype.Service;
  24 +import org.thingsboard.rule.engine.api.RpcError;
24 import org.thingsboard.server.actors.ActorSystemContext; 25 import org.thingsboard.server.actors.ActorSystemContext;
25 import org.thingsboard.server.common.data.id.TenantId; 26 import org.thingsboard.server.common.data.id.TenantId;
26 import org.thingsboard.server.common.msg.TbActorMsg; 27 import org.thingsboard.server.common.msg.TbActorMsg;
@@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
31 import org.thingsboard.server.common.msg.queue.ServiceType; 32 import org.thingsboard.server.common.msg.queue.ServiceType;
32 import org.thingsboard.server.common.msg.queue.TbCallback; 33 import org.thingsboard.server.common.msg.queue.TbCallback;
33 import org.thingsboard.server.common.msg.queue.TbMsgCallback; 34 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  35 +import org.thingsboard.server.gen.transport.TransportProtos;
34 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 36 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
35 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 37 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
36 import org.thingsboard.server.queue.TbQueueConsumer; 38 import org.thingsboard.server.queue.TbQueueConsumer;
@@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr @@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr
48 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; 50 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
49 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; 51 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
50 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; 52 import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
  53 +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
  54 +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
  55 +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
51 import org.thingsboard.server.service.stats.RuleEngineStatisticsService; 56 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
52 57
53 import javax.annotation.PostConstruct; 58 import javax.annotation.PostConstruct;
@@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
81 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory; 86 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
82 private final TbQueueRuleEngineSettings ruleEngineSettings; 87 private final TbQueueRuleEngineSettings ruleEngineSettings;
83 private final RuleEngineStatisticsService statisticsService; 88 private final RuleEngineStatisticsService statisticsService;
  89 + private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
84 private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>(); 90 private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
85 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>(); 91 private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
86 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>(); 92 private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
@@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
90 TbRuleEngineSubmitStrategyFactory submitStrategyFactory, 96 TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
91 TbQueueRuleEngineSettings ruleEngineSettings, 97 TbQueueRuleEngineSettings ruleEngineSettings,
92 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, 98 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
93 - ActorSystemContext actorContext, DataDecodingEncodingService encodingService) { 99 + ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
  100 + TbRuleEngineDeviceRpcService tbDeviceRpcService) {
94 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); 101 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
95 this.statisticsService = statisticsService; 102 this.statisticsService = statisticsService;
96 this.ruleEngineSettings = ruleEngineSettings; 103 this.ruleEngineSettings = ruleEngineSettings;
97 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; 104 this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
98 this.submitStrategyFactory = submitStrategyFactory; 105 this.submitStrategyFactory = submitStrategyFactory;
99 this.processingStrategyFactory = processingStrategyFactory; 106 this.processingStrategyFactory = processingStrategyFactory;
  107 + this.tbDeviceRpcService = tbDeviceRpcService;
100 } 108 }
101 109
102 @PostConstruct 110 @PostConstruct
@@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
227 actorContext.tell(actorMsg.get(), ActorRef.noSender()); 235 actorContext.tell(actorMsg.get(), ActorRef.noSender());
228 } 236 }
229 callback.onSuccess(); 237 callback.onSuccess();
  238 + } else if (nfMsg.hasFromDeviceRpcResponse()) {
  239 + TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
  240 + RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
  241 + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
  242 + , proto.getResponse(), error);
  243 + tbDeviceRpcService.processRpcResponseFromDevice(response);
  244 + callback.onSuccess();
230 } else { 245 } else {
  246 + log.trace("Received notification with missing handler");
231 callback.onSuccess(); 247 callback.onSuccess();
232 } 248 }
233 } 249 }