Commit fde9eb023f29f03320dcec4c219a8d60ec2d7141

Authored by Andrew Shvayka
1 parent 55127ae6

Added support of RPC calls in a cluster mode

... ... @@ -38,6 +38,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
38 38 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
39 39 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
40 40 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  41 +import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
41 42 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
42 43 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
43 44 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
... ... @@ -112,21 +113,27 @@ public class AppActor extends RuleChainManagerActor {
112 113 case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
113 114 onToDeviceActorMsg((TenantAwareMsg) msg);
114 115 break;
  116 + case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
  117 + onToDeviceSessionMsg((BasicActorSystemToDeviceSessionActorMsg) msg);
115 118 default:
116 119 return false;
117 120 }
118 121 return true;
119 122 }
120 123
  124 + private void onToDeviceSessionMsg(BasicActorSystemToDeviceSessionActorMsg msg) {
  125 + systemContext.getSessionManagerActor().tell(msg, self());
  126 + }
  127 +
121 128 private void onPossibleClusterMsg(SendToClusterMsg msg) {
122 129 Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
123   - if (address.isPresent()) {
124   - systemContext.getRpcService().tell(
125   - systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
126   - } else {
127   - self().tell(msg.getMsg(), ActorRef.noSender());
128   - }
  130 + if (address.isPresent()) {
  131 + systemContext.getRpcService().tell(
  132 + systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
  133 + } else {
  134 + self().tell(msg.getMsg(), ActorRef.noSender());
129 135 }
  136 + }
130 137
131 138 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
132 139 if (SYSTEM_TENANT.equals(msg.getTenantId())) {
... ...
... ... @@ -163,7 +163,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
163 163
164 164 if (request.isOneway() && sent) {
165 165 logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
166   - systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
  166 + systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(msg.getMsg().getId(), msg.getServerAddress(), null, null));
167 167 } else {
168 168 registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
169 169 }
... ... @@ -185,8 +185,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
185 185 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
186 186 if (requestMd != null) {
187 187 logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
188   - systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
189   - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
  188 + systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  189 + requestMd.getMsg().getServerAddress(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
190 190 }
191 191 }
192 192
... ... @@ -234,11 +234,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
234 234
235 235 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, SessionId sessionId, Optional<ServerAddress> server, Set<Integer> sentOneWayIds) {
236 236 return entry -> {
  237 + ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();
237 238 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
238 239 ToDeviceRpcRequestBody body = request.getBody();
239 240 if (request.isOneway()) {
240 241 sentOneWayIds.add(entry.getKey());
241   - systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(request.getId(), null, null));
  242 + systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(request.getId(), requestActorMsg.getServerAddress(), null, null));
242 243 }
243 244 ToDeviceRpcRequestMsg rpcRequest = new ToDeviceRpcRequestMsg(
244 245 entry.getKey(),
... ... @@ -360,7 +361,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
360 361 kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
361 362 }
362 363 TbMsgMetaData metaData = defaultMetaData.copy();
363   - metaData.putValue("ts", entry.getKey()+"");
  364 + metaData.putValue("ts", entry.getKey() + "");
364 365 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
365 366 pushToRuleEngineWithTimeout(context, tbMsg, msgData);
366 367 }
... ... @@ -451,7 +452,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
451 452 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
452 453 boolean success = requestMd != null;
453 454 if (success) {
454   - systemContext.getDeviceRpcService().process(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getData(), null));
  455 + systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  456 + requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));
455 457 } else {
456 458 logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
457 459 }
... ...
... ... @@ -114,6 +114,7 @@ public class RpcManagerActor extends ContextAwareActor {
114 114 logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
115 115 }
116 116 }
  117 +
117 118 @Override
118 119 public void postStop() {
119 120 sessionActors.clear();
... ... @@ -157,7 +158,7 @@ public class RpcManagerActor extends ContextAwareActor {
157 158 private void onSessionClose(boolean reconnect, ServerAddress remoteAddress) {
158 159 log.debug("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
159 160 SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
160   - if (context().sender().equals(sessionRef.actor)) {
  161 + if (context().sender() != null && context().sender().equals(sessionRef.actor)) {
161 162 sessionActors.remove(remoteAddress);
162 163 pendingMsgs.remove(remoteAddress);
163 164 if (reconnect) {
... ...
... ... @@ -27,7 +27,6 @@ import org.thingsboard.rule.engine.api.ScriptEngine;
27 27 import org.thingsboard.rule.engine.api.TbContext;
28 28 import org.thingsboard.rule.engine.api.TbRelationTypes;
29 29 import org.thingsboard.server.actors.ActorSystemContext;
30   -import org.thingsboard.server.common.data.DataConstants;
31 30 import org.thingsboard.server.common.data.id.DeviceId;
32 31 import org.thingsboard.server.common.data.id.EntityId;
33 32 import org.thingsboard.server.common.data.id.RuleNodeId;
... ... @@ -224,7 +223,7 @@ class DefaultTbContext implements TbContext {
224 223 public void sendRpcRequest(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
225 224 ToDeviceRpcRequest request = new ToDeviceRpcRequest(UUIDs.timeBased(), nodeCtx.getTenantId(), src.getDeviceId(),
226 225 src.isOneway(), System.currentTimeMillis() + src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()));
227   - mainCtx.getDeviceRpcService().process(request, response -> {
  226 + mainCtx.getDeviceRpcService().processRpcRequestToDevice(request, response -> {
228 227 consumer.accept(RuleEngineDeviceRpcResponse.builder()
229 228 .deviceId(src.getDeviceId())
230 229 .requestId(src.getRequestId())
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
... ... @@ -233,6 +233,9 @@ public class DefaultActorService implements ActorService {
233 233 case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE:
234 234 actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray());
235 235 break;
  236 + case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE:
  237 + actorContext.getDeviceRpcService().processRemoteResponseFromDevice(serverAddress, msg.getPayload().toByteArray());
  238 + break;
236 239 }
237 240 }
238 241
... ...
... ... @@ -128,7 +128,7 @@ public class RpcController extends BaseController {
128 128 timeout,
129 129 body
130 130 );
131   - deviceRpcService.process(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
  131 + deviceRpcService.processRpcRequestToDevice(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse));
132 132 }
133 133
134 134 @Override
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.service.rpc;
17 17
  18 +import com.google.protobuf.InvalidProtocolBufferException;
18 19 import lombok.extern.slf4j.Slf4j;
19 20 import org.springframework.beans.factory.annotation.Autowired;
20 21 import org.springframework.stereotype.Service;
... ... @@ -28,11 +29,15 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
28 29 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
29 30 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
30 31 import org.thingsboard.server.dao.audit.AuditLogService;
  32 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
31 33 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
32 34 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  35 +import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
  36 +import org.thingsboard.server.service.telemetry.sub.Subscription;
33 37
34 38 import javax.annotation.PostConstruct;
35 39 import javax.annotation.PreDestroy;
  40 +import java.util.Optional;
36 41 import java.util.UUID;
37 42 import java.util.concurrent.ConcurrentHashMap;
38 43 import java.util.concurrent.ConcurrentMap;
... ... @@ -57,14 +62,10 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
57 62 @Autowired
58 63 private ActorService actorService;
59 64
60   - @Autowired
61   - private AuditLogService auditLogService;
62   -
63 65 private ScheduledExecutorService rpcCallBackExecutor;
64 66
65 67 private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> localRpcRequests = new ConcurrentHashMap<>();
66 68
67   -
68 69 @PostConstruct
69 70 public void initExecutor() {
70 71 rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
... ... @@ -78,7 +79,7 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
78 79 }
79 80
80 81 @Override
81   - public void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
  82 + public void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
82 83 log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
83 84 sendRpcRequest(request);
84 85 UUID requestId = request.getId();
... ... @@ -89,33 +90,48 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
89 90 log.error("[{}] timeout the request: [{}]", this.hashCode(), requestId);
90 91 Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
91 92 if (consumer != null) {
92   - consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
  93 + consumer.accept(new FromDeviceRpcResponse(requestId, null, null, RpcError.TIMEOUT));
93 94 }
94 95 }, timeout, TimeUnit.MILLISECONDS);
95 96 }
96 97
97 98 @Override
98   - public void process(ToDeviceRpcRequest request, ServerAddress originator) {
99   -// if (pluginServerAddress.isPresent()) {
100   -// systemContext.getRpcService().tell(pluginServerAddress.get(), responsePluginMsg);
101   -// logger.debug("[{}] Rpc command response sent to remote plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
102   -// } else {
103   -// context.parent().tell(responsePluginMsg, ActorRef.noSender());
104   -// logger.debug("[{}] Rpc command response sent to local plugin actor [{}]!", deviceId, requestMd.getMsg().getMsg().getId());
105   -// }
  99 + public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
  100 + log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
  101 + if (routingService.getCurrentServer().equals(response.getServerAddress())) {
  102 + UUID requestId = response.getId();
  103 + Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
  104 + if (consumer != null) {
  105 + consumer.accept(response);
  106 + } else {
  107 + log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
  108 + }
  109 + } else {
  110 + ClusterAPIProtos.FromDeviceRPCResponseProto.Builder builder = ClusterAPIProtos.FromDeviceRPCResponseProto.newBuilder();
  111 + builder.setRequestIdMSB(response.getId().getMostSignificantBits());
  112 + builder.setRequestIdLSB(response.getId().getLeastSignificantBits());
  113 + response.getResponse().ifPresent(builder::setResponse);
  114 + if (response.getError().isPresent()) {
  115 + builder.setError(response.getError().get().ordinal());
  116 + } else {
  117 + builder.setError(-1);
  118 + }
  119 + rpcService.tell(response.getServerAddress(), ClusterAPIProtos.MessageType.CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE, builder.build().toByteArray());
  120 + }
106 121 }
107 122
108 123 @Override
109   - public void process(FromDeviceRpcResponse response) {
110   - log.error("[{}] response the request: [{}]", this.hashCode(), response.getId());
111   - //TODO: send to another server if needed.
112   - UUID requestId = response.getId();
113   - Consumer<FromDeviceRpcResponse> consumer = localRpcRequests.remove(requestId);
114   - if (consumer != null) {
115   - consumer.accept(response);
116   - } else {
117   - log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
  124 + public void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] data) {
  125 + ClusterAPIProtos.FromDeviceRPCResponseProto proto;
  126 + try {
  127 + proto = ClusterAPIProtos.FromDeviceRPCResponseProto.parseFrom(data);
  128 + } catch (InvalidProtocolBufferException e) {
  129 + throw new RuntimeException(e);
118 130 }
  131 + RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
  132 + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), routingService.getCurrentServer(),
  133 + proto.getResponse(), error);
  134 + processRpcResponseFromDevice(response);
119 135 }
120 136
121 137 @Override
... ... @@ -125,8 +141,8 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
125 141 }
126 142
127 143 private void sendRpcRequest(ToDeviceRpcRequest msg) {
  144 + ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(routingService.getCurrentServer(), msg);
128 145 log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
129   - ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
130 146 forward(msg.getDeviceId(), rpcMsg);
131 147 }
132 148
... ...
... ... @@ -27,12 +27,11 @@ import java.util.function.Consumer;
27 27 */
28 28 public interface DeviceRpcService {
29 29
30   - void process(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
  30 + void processRpcRequestToDevice(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer);
31 31
32   - void process(ToDeviceRpcRequest request, ServerAddress originator);
33   -
34   - void process(FromDeviceRpcResponse response);
  32 + void processRpcResponseFromDevice(FromDeviceRpcResponse response);
35 33
36 34 void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body);
37 35
  36 + void processRemoteResponseFromDevice(ServerAddress serverAddress, byte[] bytes);
38 37 }
... ...
... ... @@ -19,6 +19,7 @@ import lombok.Getter;
19 19 import lombok.RequiredArgsConstructor;
20 20 import lombok.ToString;
21 21 import org.thingsboard.rule.engine.api.RpcError;
  22 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
22 23
23 24 import java.util.Optional;
24 25 import java.util.UUID;
... ... @@ -31,6 +32,8 @@ import java.util.UUID;
31 32 public class FromDeviceRpcResponse {
32 33 @Getter
33 34 private final UUID id;
  35 + @Getter
  36 + private final ServerAddress serverAddress;
34 37 private final String response;
35 38 private final RpcError error;
36 39
... ...
... ... @@ -34,18 +34,11 @@ import java.util.Optional;
34 34 @RequiredArgsConstructor
35 35 public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg {
36 36
  37 + @Getter
37 38 private final ServerAddress serverAddress;
38 39 @Getter
39 40 private final ToDeviceRpcRequest msg;
40 41
41   - public ToDeviceRpcRequestActorMsg(ToDeviceRpcRequest msg) {
42   - this(null, msg);
43   - }
44   -
45   - public Optional<ServerAddress> getServerAddress() {
46   - return Optional.ofNullable(serverAddress);
47   - }
48   -
49 42 @Override
50 43 public DeviceId getDeviceId() {
51 44 return msg.getDeviceId();
... ...
... ... @@ -34,8 +34,6 @@ import java.util.Optional;
34 34 @RequiredArgsConstructor
35 35 public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg {
36 36
37   - private final ServerAddress serverAddress;
38   -
39 37 @Getter
40 38 private final TenantId tenantId;
41 39
... ... @@ -45,14 +43,6 @@ public class ToServerRpcResponseActorMsg implements ToDeviceActorNotificationMsg
45 43 @Getter
46 44 private final ToServerRpcResponseMsg msg;
47 45
48   - public ToServerRpcResponseActorMsg(TenantId tenantId, DeviceId deviceId, ToServerRpcResponseMsg msg) {
49   - this(null, tenantId, deviceId, msg);
50   - }
51   -
52   - public Optional<ServerAddress> getServerAddress() {
53   - return Optional.ofNullable(serverAddress);
54   - }
55   -
56 46 @Override
57 47 public MsgType getMsgType() {
58 48 return MsgType.SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG;
... ...
... ... @@ -56,6 +56,7 @@ enum MessageType {
56 56 CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE = 9;
57 57 CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE = 10;
58 58 CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE = 11;
  59 + CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE = 12;
59 60 }
60 61
61 62 // Messages related to CLUSTER_TELEMETRY_MESSAGE
... ... @@ -121,3 +122,9 @@ message KeyValueProto {
121 122 bool boolValue = 7;
122 123 }
123 124
  125 +message FromDeviceRPCResponseProto {
  126 + int64 requestIdMSB = 1;
  127 + int64 requestIdLSB = 2;
  128 + string response = 3;
  129 + int32 error = 4;
  130 +}
... ...
... ... @@ -42,7 +42,7 @@ public class BasicActorSystemToDeviceSessionActorMsg implements ActorSystemToDev
42 42
43 43 @Override
44 44 public String toString() {
45   - return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
  45 + return "BasicActorSystemToDeviceSessionActorMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
46 46 }
47 47
48 48 @Override
... ...