Commit aa4787e9891c1ab332568017579eb53fe0fbeff9

Authored by Andrew Shvayka
1 parent 35e91d27

Fix to RPC logic

@@ -41,7 +41,6 @@ import org.thingsboard.server.common.msg.TbMsg; @@ -41,7 +41,6 @@ import org.thingsboard.server.common.msg.TbMsg;
41 import org.thingsboard.server.common.msg.TbMsgDataType; 41 import org.thingsboard.server.common.msg.TbMsgDataType;
42 import org.thingsboard.server.common.msg.TbMsgMetaData; 42 import org.thingsboard.server.common.msg.TbMsgMetaData;
43 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; 43 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
44 -import org.thingsboard.server.common.msg.cluster.ServerAddress;  
45 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 44 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
46 import org.thingsboard.server.common.msg.session.SessionMsgType; 45 import org.thingsboard.server.common.msg.session.SessionMsgType;
47 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; 46 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
@@ -152,7 +151,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -152,7 +151,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
152 151
153 if (request.isOneway() && sent) { 152 if (request.isOneway() && sent) {
154 logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); 153 logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
155 - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null)); 154 + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
156 } else { 155 } else {
157 registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); 156 registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
158 } 157 }
@@ -174,8 +173,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -174,8 +173,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
174 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); 173 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
175 if (requestMd != null) { 174 if (requestMd != null) {
176 logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); 175 logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
177 - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
178 - requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 176 + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  177 + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
179 } 178 }
180 } 179 }
181 180
@@ -207,7 +206,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -207,7 +206,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
207 ToDeviceRpcRequestBody body = request.getBody(); 206 ToDeviceRpcRequestBody body = request.getBody();
208 if (request.isOneway()) { 207 if (request.isOneway()) {
209 sentOneWayIds.add(entry.getKey()); 208 sentOneWayIds.add(entry.getKey());
210 - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null)); 209 + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
211 } 210 }
212 ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId( 211 ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId(
213 entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build(); 212 entry.getKey()).setMethodName(body.getMethod()).setParams(body.getParams()).build();
@@ -400,8 +399,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -400,8 +399,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
400 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); 399 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
401 boolean success = requestMd != null; 400 boolean success = requestMd != null;
402 if (success) { 401 if (success) {
403 - systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
404 - requestMd.getMsg().getServerAddress(), responseMsg.getPayload(), null)); 402 + systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  403 + responseMsg.getPayload(), null));
405 } else { 404 } else {
406 logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 405 logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
407 } 406 }
@@ -17,6 +17,7 @@ package org.thingsboard.server.actors.ruleChain; @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.ruleChain;
17 17
18 import akka.actor.ActorRef; 18 import akka.actor.ActorRef;
19 import com.datastax.driver.core.utils.UUIDs; 19 import com.datastax.driver.core.utils.UUIDs;
  20 +import org.springframework.util.StringUtils;
20 import org.thingsboard.rule.engine.api.ListeningExecutor; 21 import org.thingsboard.rule.engine.api.ListeningExecutor;
21 import org.thingsboard.rule.engine.api.MailService; 22 import org.thingsboard.rule.engine.api.MailService;
22 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; 23 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
@@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; @@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
35 import org.thingsboard.server.common.data.rule.RuleNode; 36 import org.thingsboard.server.common.data.rule.RuleNode;
36 import org.thingsboard.server.common.msg.TbMsg; 37 import org.thingsboard.server.common.msg.TbMsg;
37 import org.thingsboard.server.common.msg.TbMsgMetaData; 38 import org.thingsboard.server.common.msg.TbMsgMetaData;
  39 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  40 +import org.thingsboard.server.common.msg.cluster.ServerType;
38 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; 41 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
39 import org.thingsboard.server.dao.alarm.AlarmService; 42 import org.thingsboard.server.dao.alarm.AlarmService;
40 import org.thingsboard.server.dao.asset.AssetService; 43 import org.thingsboard.server.dao.asset.AssetService;
@@ -232,16 +235,22 @@ class DefaultTbContext implements TbContext { @@ -232,16 +235,22 @@ class DefaultTbContext implements TbContext {
232 return new RuleEngineRpcService() { 235 return new RuleEngineRpcService() {
233 @Override 236 @Override
234 public void sendRpcReply(DeviceId deviceId, int requestId, String body) { 237 public void sendRpcReply(DeviceId deviceId, int requestId, String body) {
235 - mainCtx.getDeviceRpcService().sendRpcReplyToDevice(nodeCtx.getTenantId(), deviceId, requestId, body); 238 + mainCtx.getDeviceRpcService().sendReplyToRpcCallFromDevice(nodeCtx.getTenantId(), deviceId, requestId, body);
236 } 239 }
237 240
238 @Override 241 @Override
239 public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { 242 public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
240 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(), 243 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), nodeCtx.getTenantId(), src.getDeviceId(),
241 src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody())); 244 src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
242 - mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> { 245 + mainCtx.getDeviceRpcService().forwardServerSideRPCRequestToDeviceActor(request, response -> {
243 if (src.isRestApiCall()) { 246 if (src.isRestApiCall()) {
244 - mainCtx.getDeviceRpcService().processRestAPIRpcResponseFromRuleEngine(response); 247 + ServerAddress requestOriginAddress;
  248 + if (!StringUtils.isEmpty(src.getOriginHost())) {
  249 + requestOriginAddress = new ServerAddress(src.getOriginHost(), src.getOriginPort(), ServerType.CORE);
  250 + } else {
  251 + requestOriginAddress = mainCtx.getRoutingService().getCurrentServer();
  252 + }
  253 + mainCtx.getDeviceRpcService().processResponseToServerSideRPCRequestFromRuleEngine(requestOriginAddress, response);
245 } 254 }
246 consumer.accept(RuleEngineDeviceRpcResponse.builder() 255 consumer.accept(RuleEngineDeviceRpcResponse.builder()
247 .deviceId(src.getDeviceId()) 256 .deviceId(src.getDeviceId())
@@ -35,5 +35,4 @@ public interface ActorService extends SessionMsgProcessor, RpcMsgListener, Disco @@ -35,5 +35,4 @@ public interface ActorService extends SessionMsgProcessor, RpcMsgListener, Disco
35 35
36 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType); 36 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
37 37
38 - void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);  
39 } 38 }
@@ -42,7 +42,6 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; @@ -42,7 +42,6 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
42 import org.thingsboard.server.common.msg.cluster.ServerAddress; 42 import org.thingsboard.server.common.msg.cluster.ServerAddress;
43 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; 43 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
44 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; 44 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
45 -import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;  
46 import org.thingsboard.server.gen.cluster.ClusterAPIProtos; 45 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
47 import org.thingsboard.server.service.cluster.discovery.DiscoveryService; 46 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
48 import org.thingsboard.server.service.cluster.discovery.ServerInstance; 47 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
@@ -159,11 +158,6 @@ public class DefaultActorService implements ActorService { @@ -159,11 +158,6 @@ public class DefaultActorService implements ActorService {
159 appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender()); 158 appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
160 } 159 }
161 160
162 - @Override  
163 - public void onMsg(ServiceToRuleEngineMsg msg) {  
164 - appActor.tell(msg, ActorRef.noSender());  
165 - }  
166 -  
167 public void broadcast(ToAllNodesMsg msg) { 161 public void broadcast(ToAllNodesMsg msg) {
168 actorContext.getEncodingService().encode(msg); 162 actorContext.getEncodingService().encode(msg);
169 rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage 163 rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
@@ -185,7 +179,7 @@ public class DefaultActorService implements ActorService { @@ -185,7 +179,7 @@ public class DefaultActorService implements ActorService {
185 ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType()); 179 ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType());
186 if (log.isDebugEnabled()) { 180 if (log.isDebugEnabled()) {
187 log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress); 181 log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress);
188 - log.info("MSG: ", msg); 182 + log.info("MSG: {}", msg);
189 } 183 }
190 switch (msg.getMessageType()) { 184 switch (msg.getMessageType()) {
191 case CLUSTER_ACTOR_MESSAGE: 185 case CLUSTER_ACTOR_MESSAGE:
@@ -219,7 +213,7 @@ public class DefaultActorService implements ActorService { @@ -219,7 +213,7 @@ public class DefaultActorService implements ActorService {
219 actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray()); 213 actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
220 break; 214 break;
221 case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE: 215 case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
222 - actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray()); 216 + actorContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromRemoteServer(serverAddress, msg.getPayload().toByteArray());
223 break; 217 break;
224 case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE: 218 case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE:
225 actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray()); 219 actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray());
@@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle; @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle;
48 import org.thingsboard.server.common.msg.TbMsg; 48 import org.thingsboard.server.common.msg.TbMsg;
49 import org.thingsboard.server.common.msg.TbMsgDataType; 49 import org.thingsboard.server.common.msg.TbMsgDataType;
50 import org.thingsboard.server.common.msg.TbMsgMetaData; 50 import org.thingsboard.server.common.msg.TbMsgMetaData;
  51 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
51 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 52 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
52 import org.thingsboard.server.dao.alarm.AlarmService; 53 import org.thingsboard.server.dao.alarm.AlarmService;
53 import org.thingsboard.server.dao.asset.AssetService; 54 import org.thingsboard.server.dao.asset.AssetService;
@@ -673,7 +674,7 @@ public abstract class BaseController { @@ -673,7 +674,7 @@ public abstract class BaseController {
673 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, entityId, metaData, TbMsgDataType.JSON 674 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, entityId, metaData, TbMsgDataType.JSON
674 , json.writeValueAsString(entityNode) 675 , json.writeValueAsString(entityNode)
675 , null, null, 0L); 676 , null, null, 0L);
676 - actorService.onMsg(new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg)); 677 + actorService.onMsg(new SendToClusterMsg(entityId, new ServiceToRuleEngineMsg(user.getTenantId(), tbMsg)));
677 } catch (Exception e) { 678 } catch (Exception e) {
678 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); 679 log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e);
679 } 680 }
@@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -90,7 +90,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
90 90
91 @Override 91 @Override
92 public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) { 92 public void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
93 - log.trace("[{}][{}] Processing local rpc call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); 93 + log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
94 UUID requestId = request.getId(); 94 UUID requestId = request.getId();
95 localToRuleEngineRpcRequests.put(requestId, responseConsumer); 95 localToRuleEngineRpcRequests.put(requestId, responseConsumer);
96 sendRpcRequestToRuleEngine(request); 96 sendRpcRequestToRuleEngine(request);
@@ -98,31 +98,11 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -98,31 +98,11 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
98 } 98 }
99 99
100 @Override 100 @Override
101 - public void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response) {  
102 - UUID requestId = response.getId();  
103 - Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);  
104 - if (consumer != null) {  
105 - consumer.accept(response);  
106 - } else {  
107 - log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);  
108 - }  
109 - }  
110 -  
111 - @Override  
112 - public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {  
113 - log.trace("[{}][{}] Processing local rpc call to device [{}]", request.getTenantId(), request.getId(), request.getDeviceId());  
114 - UUID requestId = request.getId();  
115 - localToDeviceRpcRequests.put(requestId, responseConsumer);  
116 - sendRpcRequestToDevice(request);  
117 - scheduleTimeout(request, requestId, localToDeviceRpcRequests);  
118 - }  
119 -  
120 - @Override  
121 - public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {  
122 - log.trace("[{}] Received device RPC response from server: [{}]", response.getId(), response.getServerAddress());  
123 - if (routingService.getCurrentServer().equals(response.getServerAddress())) { 101 + public void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response) {
  102 + log.trace("[{}] Received response to server-side RPC request from rule engine: [{}]", response.getId(), requestOriginAddress);
  103 + if (routingService.getCurrentServer().equals(requestOriginAddress)) {
124 UUID requestId = response.getId(); 104 UUID requestId = response.getId();
125 - Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId); 105 + Consumer<FromDeviceRpcResponse> consumer = localToRuleEngineRpcRequests.remove(requestId);
126 if (consumer != null) { 106 if (consumer != null) {
127 consumer.accept(response); 107 consumer.accept(response);
128 } else { 108 } else {
@@ -138,12 +118,33 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -138,12 +118,33 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
138 } else { 118 } else {
139 builder.setError(-1); 119 builder.setError(-1);
140 } 120 }
141 - rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray()); 121 + rpcService.tell(requestOriginAddress, ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
  122 + }
  123 + }
  124 +
  125 + @Override
  126 + public void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
  127 + log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
  128 + UUID requestId = request.getId();
  129 + localToDeviceRpcRequests.put(requestId, responseConsumer);
  130 + sendRpcRequestToDevice(request);
  131 + scheduleTimeout(request, requestId, localToDeviceRpcRequests);
  132 + }
  133 +
  134 + @Override
  135 + public void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response) {
  136 + log.trace("[{}] Received response to server-side RPC request from device actor.", response.getId());
  137 + UUID requestId = response.getId();
  138 + Consumer<FromDeviceRpcResponse> consumer = localToDeviceRpcRequests.remove(requestId);
  139 + if (consumer != null) {
  140 + consumer.accept(response);
  141 + } else {
  142 + log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
142 } 143 }
143 } 144 }
144 145
145 @Override 146 @Override
146 - public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) { 147 + public void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data) {
147 ClusterAPIProtos.FromDeviceRPCResponseProto proto; 148 ClusterAPIProtos.FromDeviceRPCResponseProto proto;
148 try { 149 try {
149 proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data); 150 proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data);
@@ -151,13 +152,12 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -151,13 +152,12 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
151 throw new RuntimeException(e); 152 throw new RuntimeException(e);
152 } 153 }
153 RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; 154 RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
154 - FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), serverAddress,  
155 - proto.getResponse(), error);  
156 - processRpcResponseFromDevice(response); 155 + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), proto.getResponse(), error);
  156 + processResponseToServerSideRPCRequestFromRuleEngine(routingService.getCurrentServer(), response);
157 } 157 }
158 158
159 @Override 159 @Override
160 - public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) { 160 + public void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
161 ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body)); 161 ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
162 forward(deviceId, rpcMsg); 162 forward(deviceId, rpcMsg);
163 } 163 }
@@ -166,6 +166,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -166,6 +166,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
166 ObjectNode entityNode = json.createObjectNode(); 166 ObjectNode entityNode = json.createObjectNode();
167 TbMsgMetaData metaData = new TbMsgMetaData(); 167 TbMsgMetaData metaData = new TbMsgMetaData();
168 metaData.putValue("requestUUID", msg.getId().toString()); 168 metaData.putValue("requestUUID", msg.getId().toString());
  169 + metaData.putValue("originHost", routingService.getCurrentServer().getHost());
  170 + metaData.putValue("originPort", Integer.toString(routingService.getCurrentServer().getPort()));
169 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime())); 171 metaData.putValue("expirationTime", Long.toString(msg.getExpirationTime()));
170 metaData.putValue("oneway", Boolean.toString(msg.isOneway())); 172 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
171 173
@@ -176,7 +178,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -176,7 +178,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
176 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON 178 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON
177 , json.writeValueAsString(entityNode) 179 , json.writeValueAsString(entityNode)
178 , null, null, 0L); 180 , null, null, 0L);
179 - actorService.onMsg(new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg)); 181 + actorService.onMsg(new SendToClusterMsg(msg.getDeviceId(), new ServiceToRuleEngineMsg(msg.getTenantId(), tbMsg)));
180 } catch (JsonProcessingException e) { 182 } catch (JsonProcessingException e) {
181 throw new RuntimeException(e); 183 throw new RuntimeException(e);
182 } 184 }
@@ -199,7 +201,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService { @@ -199,7 +201,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
199 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId); 201 log.trace("[{}] timeout the request: [{}]", this.hashCode(), requestId);
200 Consumer<FromDeviceRpcResponse> consumer = requestsMap.remove(requestId); 202 Consumer<FromDeviceRpcResponse> consumer = requestsMap.remove(requestId);
201 if (consumer != null) { 203 if (consumer != null) {
202 - consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT)); 204 + consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
203 } 205 }
204 }, timeout, TimeUnit.MILLISECONDS); 206 }, timeout, TimeUnit.MILLISECONDS);
205 } 207 }
@@ -29,13 +29,13 @@ public interface DeviceRpcService { @@ -29,13 +29,13 @@ public interface DeviceRpcService {
29 29
30 void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer); 30 void processRestAPIRpcRequestToRuleEngine(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
31 31
32 - void processRestAPIRpcResponseFromRuleEngine(FromDeviceRpcResponse response); 32 + void processResponseToServerSideRPCRequestFromRuleEngine(ServerAddress requestOriginAddress, FromDeviceRpcResponse response);
33 33
34 - void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer); 34 + void forwardServerSideRPCRequestToDeviceActor(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
35 35
36 - void processRpcResponseFromDevice(FromDeviceRpcResponse response); 36 + void processResponseToServerSideRPCRequestFromDeviceActor(FromDeviceRpcResponse response);
37 37
38 - void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body); 38 + void processResponseToServerSideRPCRequestFromRemoteServer(ServerAddress serverAddress, byte[] data);
39 39
40 - void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes); 40 + void sendReplyToRpcCallFromDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
41 } 41 }
@@ -32,8 +32,6 @@ import java.util.UUID; @@ -32,8 +32,6 @@ import java.util.UUID;
32 public class FromDeviceRpcResponse { 32 public class FromDeviceRpcResponse {
33 @Getter 33 @Getter
34 private final UUID id; 34 private final UUID id;
35 - @Getter  
36 - private final ServerAddress serverAddress;  
37 private final String response; 35 private final String response;
38 private final RpcError error; 36 private final RpcError error;
39 37
@@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; @@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
43 import org.thingsboard.server.common.msg.TbMsg; 43 import org.thingsboard.server.common.msg.TbMsg;
44 import org.thingsboard.server.common.msg.TbMsgDataType; 44 import org.thingsboard.server.common.msg.TbMsgDataType;
45 import org.thingsboard.server.common.msg.TbMsgMetaData; 45 import org.thingsboard.server.common.msg.TbMsgMetaData;
  46 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
46 import org.thingsboard.server.common.msg.cluster.ServerAddress; 47 import org.thingsboard.server.common.msg.cluster.ServerAddress;
47 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 48 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
48 import org.thingsboard.server.dao.attributes.AttributesService; 49 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -457,7 +458,7 @@ public class DefaultDeviceStateService implements DeviceStateService { @@ -457,7 +458,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
457 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON 458 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON
458 , json.writeValueAsString(state) 459 , json.writeValueAsString(state)
459 , null, null, 0L); 460 , null, null, 0L);
460 - actorService.onMsg(new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)); 461 + actorService.onMsg(new SendToClusterMsg(stateData.getDeviceId(), new ServiceToRuleEngineMsg(stateData.getTenantId(), tbMsg)));
461 } catch (Exception e) { 462 } catch (Exception e) {
462 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); 463 log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
463 } 464 }
@@ -22,6 +22,7 @@ option java_outer_classname = "ClusterAPIProtos"; @@ -22,6 +22,7 @@ option java_outer_classname = "ClusterAPIProtos";
22 service ClusterRpcService { 22 service ClusterRpcService {
23 rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {} 23 rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
24 } 24 }
  25 +
25 message ClusterMessage { 26 message ClusterMessage {
26 MessageType messageType = 1; 27 MessageType messageType = 1;
27 MessageMataInfo messageMetaInfo = 2; 28 MessageMataInfo messageMetaInfo = 2;
@@ -139,4 +140,4 @@ message DeviceStateServiceMsgProto { @@ -139,4 +140,4 @@ message DeviceStateServiceMsgProto {
139 bool added = 5; 140 bool added = 5;
140 bool updated = 6; 141 bool updated = 6;
141 bool deleted = 7; 142 bool deleted = 7;
142 -}  
  143 +}
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
38 import org.thingsboard.server.common.data.security.Authority; 38 import org.thingsboard.server.common.data.security.Authority;
39 import org.thingsboard.server.common.msg.TbMsg; 39 import org.thingsboard.server.common.msg.TbMsg;
40 import org.thingsboard.server.common.msg.TbMsgMetaData; 40 import org.thingsboard.server.common.msg.TbMsgMetaData;
  41 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
41 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 42 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
42 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; 43 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
43 import org.thingsboard.server.dao.attributes.AttributesService; 44 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -155,7 +156,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -155,7 +156,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
155 device.getId(), 156 device.getId(),
156 new TbMsgMetaData(), 157 new TbMsgMetaData(),
157 "{}", null, null, 0L); 158 "{}", null, null, 0L);
158 - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 159 + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
159 160
160 Thread.sleep(3000); 161 Thread.sleep(3000);
161 162
@@ -270,7 +271,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule @@ -270,7 +271,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
270 device.getId(), 271 device.getId(),
271 new TbMsgMetaData(), 272 new TbMsgMetaData(),
272 "{}", null, null, 0L); 273 "{}", null, null, 0L);
273 - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 274 + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
274 275
275 Thread.sleep(3000); 276 Thread.sleep(3000);
276 277
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
39 import org.thingsboard.server.common.data.security.Authority; 39 import org.thingsboard.server.common.data.security.Authority;
40 import org.thingsboard.server.common.msg.TbMsg; 40 import org.thingsboard.server.common.msg.TbMsg;
41 import org.thingsboard.server.common.msg.TbMsgMetaData; 41 import org.thingsboard.server.common.msg.TbMsgMetaData;
  42 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
42 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; 43 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
43 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; 44 import org.thingsboard.server.controller.AbstractRuleEngineControllerTest;
44 import org.thingsboard.server.dao.attributes.AttributesService; 45 import org.thingsboard.server.dao.attributes.AttributesService;
@@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac @@ -142,7 +143,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
142 new TbMsgMetaData(), 143 new TbMsgMetaData(),
143 "{}", 144 "{}",
144 null, null, 0L); 145 null, null, 0L);
145 - actorService.onMsg(new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)); 146 + actorService.onMsg(new SendToClusterMsg(device.getId(), new ServiceToRuleEngineMsg(savedTenant.getId(), tbMsg)));
146 147
147 Thread.sleep(3000); 148 Thread.sleep(3000);
148 149
@@ -21,11 +21,13 @@ import org.thingsboard.server.common.msg.MsgType; @@ -21,11 +21,13 @@ import org.thingsboard.server.common.msg.MsgType;
21 import org.thingsboard.server.common.msg.TbActorMsg; 21 import org.thingsboard.server.common.msg.TbActorMsg;
22 import org.thingsboard.server.common.msg.TbMsg; 22 import org.thingsboard.server.common.msg.TbMsg;
23 23
  24 +import java.io.Serializable;
  25 +
24 /** 26 /**
25 * Created by ashvayka on 15.03.18. 27 * Created by ashvayka on 15.03.18.
26 */ 28 */
27 @Data 29 @Data
28 -public final class ServiceToRuleEngineMsg implements TbActorMsg { 30 +public final class ServiceToRuleEngineMsg implements TbActorMsg, Serializable {
29 31
30 private final TenantId tenantId; 32 private final TenantId tenantId;
31 private final TbMsg tbMsg; 33 private final TbMsg tbMsg;
@@ -172,6 +172,22 @@ message ToServerRpcResponseMsg { @@ -172,6 +172,22 @@ message ToServerRpcResponseMsg {
172 string error = 3; 172 string error = 3;
173 } 173 }
174 174
  175 +//Used to report session state to tb-node and persist this state in the cache on the tb-node level.
  176 +message SubscriptionInfoProto {
  177 + int64 lastActivityTime = 1;
  178 + bool attributeSubscription = 2;
  179 + bool rpcSubscription = 3;
  180 +}
  181 +
  182 +message SessionSubscriptionInfoProto {
  183 + SessionInfoProto sessionInfo = 1;
  184 + SubscriptionInfoProto subscriptionInfo = 2;
  185 +}
  186 +
  187 +message DeviceSessionsCacheEntry {
  188 + repeated SessionSubscriptionInfoProto sessions = 1;
  189 +}
  190 +
175 message TransportToDeviceActorMsg { 191 message TransportToDeviceActorMsg {
176 SessionInfoProto sessionInfo = 1; 192 SessionInfoProto sessionInfo = 1;
177 SessionEventMsg sessionEvent = 2; 193 SessionEventMsg sessionEvent = 2;
@@ -182,6 +198,7 @@ message TransportToDeviceActorMsg { @@ -182,6 +198,7 @@ message TransportToDeviceActorMsg {
182 SubscribeToRPCMsg subscribeToRPC = 7; 198 SubscribeToRPCMsg subscribeToRPC = 7;
183 ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8; 199 ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
184 ToServerRpcRequestMsg toServerRPCCallRequest = 9; 200 ToServerRpcRequestMsg toServerRPCCallRequest = 9;
  201 + SubscriptionInfoProto subscriptionInfo = 10;
185 } 202 }
186 203
187 message DeviceActorToTransportMsg { 204 message DeviceActorToTransportMsg {
@@ -214,4 +231,4 @@ message TransportApiRequestMsg { @@ -214,4 +231,4 @@ message TransportApiRequestMsg {
214 message TransportApiResponseMsg { 231 message TransportApiResponseMsg {
215 ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; 232 ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
216 GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; 233 GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
217 -}  
  234 +}
@@ -23,7 +23,6 @@ @@ -23,7 +23,6 @@
23 <version>2.2.0-SNAPSHOT</version> 23 <version>2.2.0-SNAPSHOT</version>
24 <artifactId>thingsboard</artifactId> 24 <artifactId>thingsboard</artifactId>
25 </parent> 25 </parent>
26 - <groupId>org.thingsboard</groupId>  
27 <artifactId>msa</artifactId> 26 <artifactId>msa</artifactId>
28 <packaging>pom</packaging> 27 <packaging>pom</packaging>
29 28
@@ -31,6 +31,8 @@ public final class RuleEngineDeviceRpcRequest { @@ -31,6 +31,8 @@ public final class RuleEngineDeviceRpcRequest {
31 private final DeviceId deviceId; 31 private final DeviceId deviceId;
32 private final int requestId; 32 private final int requestId;
33 private final UUID requestUUID; 33 private final UUID requestUUID;
  34 + private final String originHost;
  35 + private final int originPort;
34 private final boolean oneway; 36 private final boolean oneway;
35 private final String method; 37 private final String method;
36 private final String body; 38 private final String body;
@@ -86,6 +86,10 @@ public class TbSendRPCRequestNode implements TbNode { @@ -86,6 +86,10 @@ public class TbSendRPCRequestNode implements TbNode {
86 86
87 tmp = msg.getMetaData().getValue("requestUUID"); 87 tmp = msg.getMetaData().getValue("requestUUID");
88 UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased(); 88 UUID requestUUID = !StringUtils.isEmpty(tmp) ? UUID.fromString(tmp) : UUIDs.timeBased();
  89 + tmp = msg.getMetaData().getValue("originHost");
  90 + String originHost = !StringUtils.isEmpty(tmp) ? tmp : null;
  91 + tmp = msg.getMetaData().getValue("originPort");
  92 + int originPort = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : 0;
89 93
90 tmp = msg.getMetaData().getValue("expirationTime"); 94 tmp = msg.getMetaData().getValue("expirationTime");
91 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); 95 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
@@ -105,6 +109,8 @@ public class TbSendRPCRequestNode implements TbNode { @@ -105,6 +109,8 @@ public class TbSendRPCRequestNode implements TbNode {
105 .deviceId(new DeviceId(msg.getOriginator().getId())) 109 .deviceId(new DeviceId(msg.getOriginator().getId()))
106 .requestId(requestId) 110 .requestId(requestId)
107 .requestUUID(requestUUID) 111 .requestUUID(requestUUID)
  112 + .originHost(originHost)
  113 + .originPort(originPort)
108 .expirationTime(expirationTime) 114 .expirationTime(expirationTime)
109 .restApiCall(restApiCall) 115 .restApiCall(restApiCall)
110 .build(); 116 .build();