Commit 6a8b099eed6e88b17cd20f69f4ce4b6961b8d8b0

Authored by Yura
1 parent 711ff2ab

init cluster refactoring

Showing 31 changed files with 425 additions and 517 deletions
... ... @@ -60,6 +60,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
60 60 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
61 61 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
62 62 import org.thingsboard.server.service.component.ComponentDiscoveryService;
  63 +import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
63 64 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
64 65 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
65 66 import org.thingsboard.server.service.mail.MailExecutorService;
... ... @@ -103,6 +104,10 @@ public class ActorSystemContext {
103 104
104 105 @Autowired
105 106 @Getter
  107 + private DataDecodingEncodingService encodingService;
  108 +
  109 + @Autowired
  110 + @Getter
106 111 private DeviceAuthService deviceAuthService;
107 112
108 113 @Autowired
... ...
... ... @@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
34 34 import org.thingsboard.server.common.msg.TbActorMsg;
35 35 import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
36 36 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  37 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
  38 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
37 39 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
38 40 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
39 41 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
... ... @@ -45,6 +47,7 @@ import scala.concurrent.duration.Duration;
45 47
46 48 import java.util.HashMap;
47 49 import java.util.Map;
  50 +import java.util.Optional;
48 51
49 52 public class AppActor extends RuleChainManagerActor {
50 53
... ... @@ -89,6 +92,9 @@ public class AppActor extends RuleChainManagerActor {
89 92 @Override
90 93 protected boolean process(TbActorMsg msg) {
91 94 switch (msg.getMsgType()) {
  95 + case SEND_TO_CLUSTER_MSG:
  96 + onPossibleClusterMsg((SendToClusterMsg) msg);
  97 + break;
92 98 case CLUSTER_EVENT_MSG:
93 99 broadcast(msg);
94 100 break;
... ... @@ -112,6 +118,16 @@ public class AppActor extends RuleChainManagerActor {
112 118 return true;
113 119 }
114 120
  121 + private void onPossibleClusterMsg(SendToClusterMsg msg) {
  122 + Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(msg.getEntityId());
  123 + if (address.isPresent()) {
  124 + systemContext.getRpcService().tell(
  125 + systemContext.getEncodingService().convertToProtoDataMessage(address.get(), msg.getMsg()));
  126 + } else {
  127 + self().tell(msg.getMsg(), ActorRef.noSender());
  128 + }
  129 + }
  130 +
115 131 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
116 132 if (SYSTEM_TENANT.equals(msg.getTenantId())) {
117 133 //TODO: ashvayka handle this.
... ...
... ... @@ -74,6 +74,7 @@ import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotific
74 74 import org.thingsboard.server.extensions.api.device.DeviceNameOrTypeUpdateMsg;
75 75 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
76 76 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
  77 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
77 78 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
78 79 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
79 80
... ... @@ -521,7 +522,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
521 522 if (sessionAddress.isPresent()) {
522 523 ServerAddress address = sessionAddress.get();
523 524 logger.debug("{} Forwarding msg: {}", address, response);
524   - systemContext.getRpcService().tell(sessionAddress.get(), response);
  525 + systemContext.getRpcService().tell(systemContext.getEncodingService()
  526 + .convertToProtoDataMessage(sessionAddress.get(), response));
525 527 } else {
526 528 systemContext.getSessionManagerActor().tell(response, ActorRef.noSender());
527 529 }
... ...
... ... @@ -82,7 +82,8 @@ public final class PluginProcessingContext implements PluginContext {
82 82
83 83 @Override
84 84 public void sendPluginRpcMsg(RpcMsg msg) {
85   - this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
  85 + //ToDO is this a cluster messsage?
  86 +// this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
86 87 }
87 88
88 89 @Override
... ...
... ... @@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
21 21 import org.thingsboard.server.common.data.id.DeviceId;
22 22 import org.thingsboard.server.common.data.id.PluginId;
23 23 import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.msg.TbActorMsg;
24 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
25 26 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
26 27 import org.thingsboard.server.common.msg.timeout.TimeoutMsg;
... ... @@ -100,7 +101,7 @@ public final class SharedPluginProcessingContext {
100 101 }
101 102
102 103 public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
103   - forward(msg.getDeviceId(), msg, rpcService::tell);
  104 + forward(msg.getDeviceId(), msg);
104 105 }
105 106
106 107 public void sendRpcRequest(ToDeviceRpcRequest msg) {
... ... @@ -109,11 +110,11 @@ public final class SharedPluginProcessingContext {
109 110 // forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
110 111 }
111 112
112   - private <T> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
  113 + private <T extends TbActorMsg> void forward(DeviceId deviceId, T msg) {
113 114 Optional<ServerAddress> instance = routingService.resolveById(deviceId);
114 115 if (instance.isPresent()) {
115 116 log.trace("[{}] Forwarding msg {} to remote device actor!", pluginId, msg);
116   - rpcFunction.accept(instance.get(), msg);
  117 + rpcService.tell(systemContext.getEncodingService().convertToProtoDataMessage(instance.get(), msg));
117 118 } else {
118 119 log.trace("[{}] Forwarding msg {} to local device actor!", pluginId, msg);
119 120 parentActor.tell(msg, ActorRef.noSender());
... ...
... ... @@ -17,30 +17,11 @@ package org.thingsboard.server.actors.rpc;
17 17
18 18 import akka.actor.ActorRef;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.springframework.util.SerializationUtils;
21   -import org.springframework.util.StringUtils;
22 20 import org.thingsboard.server.actors.ActorSystemContext;
23 21 import org.thingsboard.server.actors.service.ActorService;
24   -import org.thingsboard.server.common.data.id.DeviceId;
25   -import org.thingsboard.server.common.data.id.PluginId;
26   -import org.thingsboard.server.common.data.id.TenantId;
27   -import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
28   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
29   -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
30   -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
31   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
32   -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
33   -import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
34   -import org.thingsboard.server.extensions.api.plugins.msg.*;
35   -import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
36   -import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
37 22 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
38 23 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
39 24 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
40   -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
41   -
42   -import java.io.Serializable;
43   -import java.util.UUID;
44 25
45 26 /**
46 27 * @author Andrew Shvayka
... ... @@ -76,48 +57,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
76 57 }
77 58
78 59 @Override
79   - public void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg) {
80   - if (log.isTraceEnabled()) {
81   - log.trace("{} session [{}] received plugin msg {}", getType(session), session.getRemoteServer(), msg);
82   - }
83   - service.onMsg(convert(session.getRemoteServer(), msg));
84   - }
85   -
86   - @Override
87   - public void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg) {
88   - log.trace("{} session [{}] received device actor msg {}", getType(session), session.getRemoteServer(), msg);
89   - service.onMsg((DeviceToDeviceActorMsg) deserialize(msg.getData().toByteArray()));
90   - }
91   -
92   - @Override
93   - public void onToDeviceActorNotificationRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg) {
94   - log.trace("{} session [{}] received device actor notification msg {}", getType(session), session.getRemoteServer(), msg);
95   - service.onMsg((ToDeviceActorNotificationMsg) deserialize(msg.getData().toByteArray()));
96   - }
97   -
98   - @Override
99   - public void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg) {
100   - log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
101   - service.onMsg((ToDeviceSessionActorMsg) deserialize(msg.getData().toByteArray()));
102   - }
103   -
104   - @Override
105   - public void onToDeviceRpcRequestRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
106   - log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
107   - service.onMsg(deserialize(session.getRemoteServer(), msg));
108   - }
109   -
110   - @Override
111   - public void onFromDeviceRpcResponseRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
112   - log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
113   - service.onMsg(deserialize(session.getRemoteServer(), msg));
  60 + public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
  61 + log.trace("{} Service [{}] received session actor msg {}", getType(session),
  62 + session.getRemoteServer(),
  63 + clusterMessage);
  64 + service.onRecievedMsg(clusterMessage);
114 65 }
115 66
116   - @Override
117   - public void onToAllNodesRpcMessage(GrpcSession session, ClusterAPIProtos.ToAllNodesRpcMessage msg) {
118   - log.trace(SESSION_RECEIVED_SESSION_ACTOR_MSG, getType(session), session.getRemoteServer(), msg);
119   - service.onMsg((ToAllNodesMsg) deserialize(msg.getData().toByteArray()));
120   - }
121 67
122 68 @Override
123 69 public void onError(GrpcSession session, Throwable t) {
... ... @@ -130,37 +76,5 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
130 76 return session.isClient() ? "Client" : "Server";
131 77 }
132 78
133   - private static PluginRpcMsg convert(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcMessage msg) {
134   - ClusterAPIProtos.PluginAddress address = msg.getAddress();
135   - TenantId tenantId = new TenantId(toUUID(address.getTenantId()));
136   - PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
137   - RpcMsg rpcMsg = new RpcMsg(serverAddress, msg.getClazz(), msg.getData().toByteArray());
138   - return new PluginRpcMsg(tenantId, pluginId, rpcMsg);
139   - }
140   -
141   - private static UUID toUUID(ClusterAPIProtos.Uid uid) {
142   - return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
143   - }
144   -
145   - private static ToDeviceRpcRequestActorMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
146   - TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
147   - DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
148   -
149   - ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
150   - ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
151   -
152   - return new ToDeviceRpcRequestActorMsg(serverAddress, request);
153   - }
154   -
155   - private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
156   - RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
157   - FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
158   - return new ToPluginRpcResponseDeviceMsg(null, null, response);
159   - }
160   -
161   - @SuppressWarnings("unchecked")
162   - private static <T extends Serializable> T deserialize(byte[] data) {
163   - return (T) SerializationUtils.deserialize(data);
164   - }
165 79
166 80 }
... ...
... ... @@ -23,5 +23,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
23 23 */
24 24 @Data
25 25 public final class RpcBroadcastMsg {
26   - private final ClusterAPIProtos.ToRpcServerMessage msg;
  26 + private final ClusterAPIProtos.ClusterMessage msg;
27 27 }
... ...
... ... @@ -40,7 +40,7 @@ public class RpcManagerActor extends ContextAwareActor {
40 40
41 41 private final Map<ServerAddress, SessionActorInfo> sessionActors;
42 42
43   - private final Map<ServerAddress, Queue<ClusterAPIProtos.ToRpcServerMessage>> pendingMsgs;
  43 + private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
44 44
45 45 private final ServerAddress instance;
46 46
... ... @@ -65,8 +65,8 @@ public class RpcManagerActor extends ContextAwareActor {
65 65
66 66 @Override
67 67 public void onReceive(Object msg) throws Exception {
68   - if (msg instanceof RpcSessionTellMsg) {
69   - onMsg((RpcSessionTellMsg) msg);
  68 + if (msg instanceof ClusterAPIProtos.ClusterMessage) {
  69 + onMsg((ClusterAPIProtos.ClusterMessage) msg);
70 70 } else if (msg instanceof RpcBroadcastMsg) {
71 71 onMsg((RpcBroadcastMsg) msg);
72 72 } else if (msg instanceof RpcSessionCreateRequestMsg) {
... ... @@ -84,27 +84,32 @@ public class RpcManagerActor extends ContextAwareActor {
84 84
85 85 private void onMsg(RpcBroadcastMsg msg) {
86 86 log.debug("Forwarding msg to session actors {}", msg);
87   - sessionActors.keySet().forEach(address -> onMsg(new RpcSessionTellMsg(address, msg.getMsg())));
  87 + sessionActors.keySet().forEach(address -> onMsg(msg.getMsg()));
88 88 pendingMsgs.values().forEach(queue -> queue.add(msg.getMsg()));
89 89 }
90 90
91   - private void onMsg(RpcSessionTellMsg msg) {
92   - ServerAddress address = msg.getServerAddress();
93   - SessionActorInfo session = sessionActors.get(address);
94   - if (session != null) {
95   - log.debug("{} Forwarding msg to session actor", address);
96   - session.actor.tell(msg, ActorRef.noSender());
97   - } else {
98   - log.debug("{} Storing msg to pending queue", address);
99   - Queue<ClusterAPIProtos.ToRpcServerMessage> queue = pendingMsgs.get(address);
100   - if (queue == null) {
101   - queue = new LinkedList<>();
102   - pendingMsgs.put(address, queue);
  91 + private void onMsg(ClusterAPIProtos.ClusterMessage msg) {
  92 + if (msg.hasServerAdresss()) {
  93 + ServerAddress address = new ServerAddress(msg.getServerAdresss().getHost(),
  94 + msg.getServerAdresss().getPort());
  95 + SessionActorInfo session = sessionActors.get(address);
  96 + if (session != null) {
  97 + log.debug("{} Forwarding msg to session actor", address);
  98 + session.getActor().tell(msg, ActorRef.noSender());
  99 + } else {
  100 + log.debug("{} Storing msg to pending queue", address);
  101 + Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
  102 + if (queue == null) {
  103 + queue = new LinkedList<>();
  104 + pendingMsgs.put(new ServerAddress(
  105 + msg.getServerAdresss().getHost(), msg.getServerAdresss().getPort()), queue);
  106 + }
  107 + queue.add(msg);
103 108 }
104   - queue.add(msg.getMsg());
  109 + } else {
  110 + logger.warning("Cluster msg doesn't have set Server Address [{}]", msg);
105 111 }
106 112 }
107   -
108 113 @Override
109 114 public void postStop() {
110 115 sessionActors.clear();
... ... @@ -167,10 +172,10 @@ public class RpcManagerActor extends ContextAwareActor {
167 172 private void register(ServerAddress remoteAddress, UUID uuid, ActorRef sender) {
168 173 sessionActors.put(remoteAddress, new SessionActorInfo(uuid, sender));
169 174 log.debug("[{}][{}] Registering session actor.", remoteAddress, uuid);
170   - Queue<ClusterAPIProtos.ToRpcServerMessage> data = pendingMsgs.remove(remoteAddress);
  175 + Queue<ClusterAPIProtos.ClusterMessage> data = pendingMsgs.remove(remoteAddress);
171 176 if (data != null) {
172 177 log.debug("[{}][{}] Forwarding {} pending messages.", remoteAddress, uuid, data.size());
173   - data.forEach(msg -> sender.tell(new RpcSessionTellMsg(remoteAddress, msg), ActorRef.noSender()));
  178 + data.forEach(msg -> sender.tell(new RpcSessionTellMsg(msg), ActorRef.noSender()));
174 179 } else {
175 180 log.debug("[{}][{}] No pending messages to forward.", remoteAddress, uuid);
176 181 }
... ...
... ... @@ -32,6 +32,8 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
32 32
33 33 import java.util.UUID;
34 34
  35 +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE;
  36 +
35 37 /**
36 38 * @author Andrew Shvayka
37 39 */
... ... @@ -56,15 +58,15 @@ public class RpcSessionActor extends ContextAwareActor {
56 58
57 59 @Override
58 60 public void onReceive(Object msg) throws Exception {
59   - if (msg instanceof RpcSessionTellMsg) {
60   - tell((RpcSessionTellMsg) msg);
  61 + if (msg instanceof ClusterAPIProtos.ClusterMessage) {
  62 + tell((ClusterAPIProtos.ClusterMessage) msg);
61 63 } else if (msg instanceof RpcSessionCreateRequestMsg) {
62 64 initSession((RpcSessionCreateRequestMsg) msg);
63 65 }
64 66 }
65 67
66   - private void tell(RpcSessionTellMsg msg) {
67   - session.sendMsg(msg.getMsg());
  68 + private void tell(ClusterAPIProtos.ClusterMessage msg) {
  69 + session.sendMsg(msg);
68 70 }
69 71
70 72 @Override
... ... @@ -91,7 +93,7 @@ public class RpcSessionActor extends ContextAwareActor {
91 93 session.initInputStream();
92 94
93 95 ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
94   - StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream = stub.handlePluginMsgs(session.getInputStream());
  96 + StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream = stub.handleMsgs(session.getInputStream());
95 97
96 98 session.setOutputStream(outputStream);
97 99 session.initOutputStream();
... ... @@ -115,11 +117,10 @@ public class RpcSessionActor extends ContextAwareActor {
115 117 }
116 118 }
117 119
118   - private ClusterAPIProtos.ToRpcServerMessage toConnectMsg() {
  120 + private ClusterAPIProtos.ClusterMessage toConnectMsg() {
119 121 ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress();
120   - return ClusterAPIProtos.ToRpcServerMessage.newBuilder().setConnectMsg(
121   - ClusterAPIProtos.ConnectRpcMessage.newBuilder().setServerAddress(
122   - ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()).setPort(instance.getPort()).build()).build()).build();
123   -
  122 + return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAdresss(
  123 + ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost())
  124 + .setPort(instance.getPort()).build()).build();
124 125 }
125 126 }
... ...
... ... @@ -30,6 +30,6 @@ public final class RpcSessionCreateRequestMsg {
30 30
31 31 private final UUID msgUid;
32 32 private final ServerAddress remoteAddress;
33   - private final StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver;
  33 + private final StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver;
34 34
35 35 }
... ...
... ... @@ -24,6 +24,5 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
24 24 */
25 25 @Data
26 26 public final class RpcSessionTellMsg {
27   - private final ServerAddress serverAddress;
28   - private final ClusterAPIProtos.ToRpcServerMessage msg;
  27 + private final ClusterAPIProtos.ClusterMessage msg;
29 28 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.service;
18 18 import org.thingsboard.server.common.data.id.*;
19 19 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
20 20 import org.thingsboard.server.common.msg.TbMsg;
  21 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
21 22 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
22 23 import org.thingsboard.server.common.transport.SessionMsgProcessor;
23 24 import org.thingsboard.server.service.cluster.discovery.DiscoveryServiceListener;
... ... @@ -27,10 +28,11 @@ public interface ActorService extends SessionMsgProcessor, WebSocketMsgProcessor
27 28
28 29 void onEntityStateChange(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
29 30
30   - void onMsg(ServiceToRuleEngineMsg msg);
  31 + void onMsg(SendToClusterMsg msg);
31 32
32 33 void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId);
33 34
34 35 void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType);
35 36
  37 + void onMsg(ServiceToRuleEngineMsg serviceToRuleEngineMsg);
36 38 }
... ...
... ... @@ -19,6 +19,7 @@ import akka.actor.ActorRef;
19 19 import akka.actor.ActorSystem;
20 20 import akka.actor.Props;
21 21 import akka.actor.Terminated;
  22 +import com.google.protobuf.ByteString;
22 23 import lombok.extern.slf4j.Slf4j;
23 24 import org.springframework.beans.factory.annotation.Autowired;
24 25 import org.springframework.stereotype.Service;
... ... @@ -32,8 +33,10 @@ import org.thingsboard.server.actors.session.SessionManagerActor;
32 33 import org.thingsboard.server.actors.stats.StatsActor;
33 34 import org.thingsboard.server.common.data.id.*;
34 35 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
  36 +import org.thingsboard.server.common.msg.TbActorMsg;
35 37 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
36 38 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  39 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
37 40 import org.thingsboard.server.common.msg.cluster.ServerAddress;
38 41 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
39 42 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
... ... @@ -46,6 +49,7 @@ import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg
46 49 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
47 50 import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
48 51 import org.thingsboard.server.extensions.api.plugins.ws.msg.PluginWebsocketMsg;
  52 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
49 53 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
50 54 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
51 55 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
... ... @@ -57,6 +61,9 @@ import javax.annotation.PostConstruct;
57 61 import javax.annotation.PreDestroy;
58 62 import java.util.Optional;
59 63
  64 +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
  65 +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.RPC_BROADCAST_MSG;
  66 +
60 67 @Service
61 68 @Slf4j
62 69 public class DefaultActorService implements ActorService {
... ... @@ -127,7 +134,7 @@ public class DefaultActorService implements ActorService {
127 134 }
128 135
129 136 @Override
130   - public void onMsg(ServiceToRuleEngineMsg msg) {
  137 + public void onMsg(SendToClusterMsg msg) {
131 138 appActor.tell(msg, ActorRef.noSender());
132 139 }
133 140
... ... @@ -149,53 +156,7 @@ public class DefaultActorService implements ActorService {
149 156 appActor.tell(msg, ActorRef.noSender());
150 157 }
151 158
152   - @Override
153   - public void onMsg(ToPluginActorMsg msg) {
154   - log.trace("Processing plugin rpc msg: {}", msg);
155   - appActor.tell(msg, ActorRef.noSender());
156   - }
157   -
158   - @Override
159   - public void onMsg(DeviceToDeviceActorMsg msg) {
160   - log.trace("Processing device rpc msg: {}", msg);
161   - appActor.tell(msg, ActorRef.noSender());
162   - }
163   -
164   - @Override
165   - public void onMsg(ToDeviceActorNotificationMsg msg) {
166   - log.trace("Processing notification rpc msg: {}", msg);
167   - appActor.tell(msg, ActorRef.noSender());
168   - }
169   -
170   - @Override
171   - public void onMsg(ToDeviceSessionActorMsg msg) {
172   - log.trace("Processing session rpc msg: {}", msg);
173   - sessionManagerActor.tell(msg, ActorRef.noSender());
174   - }
175   -
176   - @Override
177   - public void onMsg(ToAllNodesMsg msg) {
178   - log.trace("Processing broadcast rpc msg: {}", msg);
179   - appActor.tell(msg, ActorRef.noSender());
180   - }
181 159
182   - @Override
183   - public void onMsg(RpcSessionCreateRequestMsg msg) {
184   - log.trace("Processing session create msg: {}", msg);
185   - rpcManagerActor.tell(msg, ActorRef.noSender());
186   - }
187   -
188   - @Override
189   - public void onMsg(RpcSessionTellMsg msg) {
190   - log.trace("Processing session rpc msg: {}", msg);
191   - rpcManagerActor.tell(msg, ActorRef.noSender());
192   - }
193   -
194   - @Override
195   - public void onMsg(RpcBroadcastMsg msg) {
196   - log.trace("Processing broadcast rpc msg: {}", msg);
197   - rpcManagerActor.tell(msg, ActorRef.noSender());
198   - }
199 160
200 161 @Override
201 162 public void onServerAdded(ServerInstance server) {
... ... @@ -223,28 +184,29 @@ public class DefaultActorService implements ActorService {
223 184 @Override
224 185 public void onCredentialsUpdate(TenantId tenantId, DeviceId deviceId) {
225 186 DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId);
226   - Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
227   - if (address.isPresent()) {
228   - rpcService.tell(address.get(), msg);
229   - } else {
230   - onMsg(msg);
231   - }
  187 + appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
232 188 }
233 189
234 190 @Override
235 191 public void onDeviceNameOrTypeUpdate(TenantId tenantId, DeviceId deviceId, String deviceName, String deviceType) {
236 192 log.trace("[{}] Processing onDeviceNameOrTypeUpdate event, deviceName: {}, deviceType: {}", deviceId, deviceName, deviceType);
237 193 DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType);
238   - Optional<ServerAddress> address = actorContext.getRoutingService().resolveById(deviceId);
239   - if (address.isPresent()) {
240   - rpcService.tell(address.get(), msg);
241   - } else {
242   - onMsg(msg);
243   - }
  194 + appActor.tell(new SendToClusterMsg(deviceId, msg), ActorRef.noSender());
  195 + }
  196 +
  197 + @Override
  198 + public void onMsg(ServiceToRuleEngineMsg msg) {
  199 + appActor.tell(msg, ActorRef.noSender());
244 200 }
245 201
246 202 public void broadcast(ToAllNodesMsg msg) {
247   - rpcService.broadcast(msg);
  203 + actorContext.getEncodingService().encode(msg);
  204 + rpcService.broadcast(new RpcBroadcastMsg(ClusterAPIProtos.ClusterMessage
  205 + .newBuilder()
  206 + .setPayload(ByteString
  207 + .copyFrom(actorContext.getEncodingService().encode(msg)))
  208 + .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
  209 + .build()));
248 210 appActor.tell(msg, ActorRef.noSender());
249 211 }
250 212
... ... @@ -253,4 +215,37 @@ public class DefaultActorService implements ActorService {
253 215 this.sessionManagerActor.tell(msg, ActorRef.noSender());
254 216 this.rpcManagerActor.tell(msg, ActorRef.noSender());
255 217 }
  218 +
  219 + @Override
  220 + public void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg) {
  221 + switch(msg.getMessageType()) {
  222 + case CLUSTER_NETWORK_SERVER_DATA_MESSAGE:
  223 + java.util.Optional<TbActorMsg> decodedMsg = actorContext.getEncodingService()
  224 + .decode(msg.getPayload().toByteArray());
  225 + if (decodedMsg.isPresent()) {
  226 + appActor.tell(decodedMsg.get(), ActorRef.noSender());
  227 + } else {
  228 + log.error("Error during decoding cluster proto message");
  229 + }
  230 + break;
  231 + case TO_ALL_NODES_MSG:
  232 + //ToDo
  233 + break;
  234 + }
  235 + }
  236 +
  237 + @Override
  238 + public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) {
  239 + rpcManagerActor.tell(msg, ActorRef.noSender());
  240 + }
  241 +
  242 + @Override
  243 + public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) {
  244 + rpcManagerActor.tell(msg, ActorRef.noSender());
  245 + }
  246 +
  247 + @Override
  248 + public void onBroadcastMsg(RpcBroadcastMsg msg) {
  249 + rpcManagerActor.tell(msg, ActorRef.noSender());
  250 + }
256 251 }
... ...
... ... @@ -21,6 +21,7 @@ import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
21 21 import org.thingsboard.server.common.data.id.DeviceId;
22 22 import org.thingsboard.server.common.data.id.SessionId;
23 23 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
  24 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
24 25 import org.thingsboard.server.common.msg.cluster.ServerAddress;
25 26 import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
26 27 import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
... ... @@ -87,22 +88,19 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP
87 88 }
88 89
89 90 protected Optional<ServerAddress> forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
  91 +
90 92 Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
91 93 if (!newAddress.equals(oldAddress)) {
92   - if (newAddress.isPresent()) {
93   - systemContext.getRpcService().tell(newAddress.get(),
94   - toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
95   - } else {
96   - getAppActor().tell(toForward, ctx.self());
97   - }
  94 + getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
  95 + .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
98 96 }
99 97 return newAddress;
100 98 }
101 99
102 100 protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
103 101 if (address.isPresent()) {
104   - systemContext.getRpcService().tell(address.get(),
105   - toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()));
  102 + systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
  103 + toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
106 104 } else {
107 105 getAppActor().tell(toForward, ctx.self());
108 106 }
... ...
... ... @@ -20,7 +20,7 @@ import lombok.EqualsAndHashCode;
20 20 import lombok.Getter;
21 21 import lombok.ToString;
22 22 import org.thingsboard.server.common.msg.cluster.ServerAddress;
23   -import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
  23 +import org.thingsboard.server.gen.discovery.ServerInstanceProtos;
24 24
25 25 /**
26 26 * @author Andrew Shvayka
... ... @@ -29,8 +29,6 @@ import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
29 29 @EqualsAndHashCode(exclude = {"serverInfo", "serverAddress"})
30 30 public final class ServerInstance implements Comparable<ServerInstance> {
31 31
32   - @Getter(AccessLevel.PACKAGE)
33   - private final ServerInfo serverInfo;
34 32 @Getter
35 33 private final String host;
36 34 @Getter
... ... @@ -38,8 +36,13 @@ public final class ServerInstance implements Comparable<ServerInstance> {
38 36 @Getter
39 37 private final ServerAddress serverAddress;
40 38
41   - public ServerInstance(ServerInfo serverInfo) {
42   - this.serverInfo = serverInfo;
  39 + public ServerInstance(ServerAddress serverAddress) {
  40 + this.serverAddress = serverAddress;
  41 + this.host = serverAddress.getHost();
  42 + this.port = serverAddress.getPort();
  43 + }
  44 +
  45 + public ServerInstance(ServerInstanceProtos.ServerInfo serverInfo) {
43 46 this.host = serverInfo.getHost();
44 47 this.port = serverInfo.getPort();
45 48 this.serverAddress = new ServerAddress(host, port);
... ...
... ... @@ -15,8 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.service.cluster.discovery;
17 17
18   -import com.google.protobuf.InvalidProtocolBufferException;
19 18 import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.commons.lang3.SerializationException;
  20 +import org.apache.commons.lang3.SerializationUtils;
20 21 import org.apache.curator.framework.CuratorFramework;
21 22 import org.apache.curator.framework.CuratorFrameworkFactory;
22 23 import org.apache.curator.framework.recipes.cache.ChildData;
... ... @@ -33,13 +34,13 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
33 34 import org.springframework.context.ApplicationListener;
34 35 import org.springframework.stereotype.Service;
35 36 import org.springframework.util.Assert;
36   -import org.thingsboard.server.gen.discovery.ServerInstanceProtos.ServerInfo;
  37 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
37 38 import org.thingsboard.server.utils.MiscUtils;
38 39
39 40 import javax.annotation.PostConstruct;
40 41 import javax.annotation.PreDestroy;
41   -import java.io.IOException;
42 42 import java.util.List;
  43 +import java.util.NoSuchElementException;
43 44 import java.util.concurrent.CopyOnWriteArrayList;
44 45 import java.util.stream.Collectors;
45 46
... ... @@ -113,7 +114,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
113 114 log.info("[{}:{}] Creating ZK node for current instance", self.getHost(), self.getPort());
114 115 nodePath = client.create()
115 116 .creatingParentsIfNeeded()
116   - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", self.getServerInfo().toByteArray());
  117 + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkNodesDir + "/", SerializationUtils.serialize(self.getServerAddress()));
117 118 log.info("[{}:{}] Created ZK node for current instance: {}", self.getHost(), self.getPort(), nodePath);
118 119 } catch (Exception e) {
119 120 log.error("Failed to create ZK node", e);
... ... @@ -144,8 +145,8 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
144 145 .filter(cd -> !cd.getPath().equals(nodePath))
145 146 .map(cd -> {
146 147 try {
147   - return new ServerInstance(ServerInfo.parseFrom(cd.getData()));
148   - } catch (InvalidProtocolBufferException e) {
  148 + return new ServerInstance( (ServerAddress) SerializationUtils.deserialize(cd.getData()));
  149 + } catch (NoSuchElementException e) {
149 150 log.error("Failed to decode ZK node", e);
150 151 throw new RuntimeException(e);
151 152 }
... ... @@ -186,8 +187,9 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
186 187 }
187 188 ServerInstance instance;
188 189 try {
189   - instance = new ServerInstance(ServerInfo.parseFrom(data.getData()));
190   - } catch (IOException e) {
  190 + ServerAddress serverAddress = SerializationUtils.deserialize(data.getData());
  191 + instance = new ServerInstance(serverAddress);
  192 + } catch (SerializationException e) {
191 193 log.error("Failed to decode server instance for node {}", data.getPath(), e);
192 194 throw e;
193 195 }
... ...
... ... @@ -25,26 +25,20 @@ import org.springframework.stereotype.Service;
25 25 import org.springframework.util.SerializationUtils;
26 26 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
27 27 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
28   -import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
29 28 import org.thingsboard.server.common.data.id.EntityId;
30   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
31   -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
32 29 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
33   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
34   -import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
35   -import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
36   -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
37   -import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
38   -import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
  30 +
39 31 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  32 +
40 33 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
41 34 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
42 35 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
43   -import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
44 36
45 37 import javax.annotation.PreDestroy;
46 38 import java.io.IOException;
47 39 import java.util.UUID;
  40 +import java.util.concurrent.ArrayBlockingQueue;
  41 +import java.util.concurrent.BlockingQueue;
48 42 import java.util.concurrent.ConcurrentHashMap;
49 43 import java.util.concurrent.ConcurrentMap;
50 44
... ... @@ -64,7 +58,8 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
64 58
65 59 private ServerInstance instance;
66 60
67   - private ConcurrentMap<UUID, RpcSessionCreationFuture> pendingSessionMap = new ConcurrentHashMap<>();
  61 + private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap =
  62 + new ConcurrentHashMap<>();
68 63
69 64 public void init(RpcMsgListener listener) {
70 65 this.listener = listener;
... ... @@ -82,11 +77,11 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
82 77 }
83 78
84 79 @Override
85   - public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> msg) {
86   - RpcSessionCreationFuture future = pendingSessionMap.remove(msgUid);
87   - if (future != null) {
  80 + public void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream) {
  81 + BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = pendingSessionMap.remove(msgUid);
  82 + if (queue != null) {
88 83 try {
89   - future.onMsg(msg);
  84 + queue.put(inputStream);
90 85 } catch (InterruptedException e) {
91 86 log.warn("Failed to report created session!");
92 87 Thread.currentThread().interrupt();
... ... @@ -97,11 +92,13 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
97 92 }
98 93
99 94 @Override
100   - public StreamObserver<ClusterAPIProtos.ToRpcServerMessage> handlePluginMsgs(StreamObserver<ClusterAPIProtos.ToRpcServerMessage> responseObserver) {
  95 + public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(
  96 + StreamObserver<ClusterAPIProtos.ClusterMessage> responseObserver) {
101 97 log.info("Processing new session.");
102 98 return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, responseObserver));
103 99 }
104 100
  101 +
105 102 @PreDestroy
106 103 public void stop() {
107 104 if (server != null) {
... ... @@ -117,65 +114,18 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
117 114 }
118 115 }
119 116
120   - @Override
121   - public void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward) {
122   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
123   - .setToDeviceActorRpcMsg(toProtoMsg(toForward)).build();
124   - tell(serverAddress, msg);
125   - }
126   -
127   - @Override
128   - public void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward) {
129   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
130   - .setToDeviceActorNotificationRpcMsg(toProtoMsg(toForward)).build();
131   - tell(serverAddress, msg);
132   - }
133   -
134   - @Override
135   - public void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward) {
136   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
137   - .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
138   - tell(serverAddress, msg);
139   - }
140   -
141   - @Override
142   - public void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward) {
143   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
144   - .setToPluginRpcResponseRpcMsg(toProtoMsg(toForward)).build();
145   - tell(serverAddress, msg);
146   - }
147   -
148   - @Override
149   - public void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward) {
150   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
151   - .setToDeviceSessionActorRpcMsg(toProtoMsg(toForward)).build();
152   - tell(serverAddress, msg);
153   - }
154 117
155 118 @Override
156   - public void tell(PluginRpcMsg toForward) {
157   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
158   - .setToPluginRpcMsg(toProtoMsg(toForward)).build();
159   - tell(toForward.getRpcMsg().getServerAddress(), msg);
160   - }
161   -
162   - @Override
163   - public void broadcast(ToAllNodesMsg toForward) {
164   - ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
165   - .setToAllNodesRpcMsg(toProtoMsg(toForward)).build();
166   - listener.onMsg(new RpcBroadcastMsg(msg));
167   - }
168   -
169   - private void tell(ServerAddress serverAddress, ClusterAPIProtos.ToRpcServerMessage msg) {
170   - listener.onMsg(new RpcSessionTellMsg(serverAddress, msg));
  119 + public void broadcast(RpcBroadcastMsg msg) {
  120 + listener.onBroadcastMsg(msg);
171 121 }
172 122
173   - private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> createSession(RpcSessionCreateRequestMsg msg) {
174   - RpcSessionCreationFuture future = new RpcSessionCreationFuture();
175   - pendingSessionMap.put(msg.getMsgUid(), future);
176   - listener.onMsg(msg);
  123 + private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg msg) {
  124 + BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> queue = new ArrayBlockingQueue<>(1);
  125 + pendingSessionMap.put(msg.getMsgUid(), queue);
  126 + listener.onRpcSessionCreateRequestMsg(msg);
177 127 try {
178   - StreamObserver<ClusterAPIProtos.ToRpcServerMessage> observer = future.get();
  128 + StreamObserver<ClusterAPIProtos.ClusterMessage> observer = queue.take();
179 129 log.info("Processed new session.");
180 130 return observer;
181 131 } catch (Exception e) {
... ... @@ -184,76 +134,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
184 134 }
185 135 }
186 136
187   - private static ClusterAPIProtos.ToDeviceActorRpcMessage toProtoMsg(DeviceToDeviceActorMsg msg) {
188   - return ClusterAPIProtos.ToDeviceActorRpcMessage.newBuilder().setData(
189   - ByteString.copyFrom(SerializationUtils.serialize(msg))
190   - ).build();
191   - }
192   -
193   - private static ClusterAPIProtos.ToDeviceActorNotificationRpcMessage toProtoMsg(ToDeviceActorNotificationMsg msg) {
194   - return ClusterAPIProtos.ToDeviceActorNotificationRpcMessage.newBuilder().setData(
195   - ByteString.copyFrom(SerializationUtils.serialize(msg))
196   - ).build();
197   - }
198   -
199   - private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestActorMsg msg) {
200   - ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
201   - ToDeviceRpcRequest request = msg.getMsg();
202   -
203   - builder.setDeviceTenantId(toUid(msg.getTenantId()));
204   - builder.setDeviceId(toUid(msg.getDeviceId()));
205   -
206   - builder.setMsgId(toUid(request.getId()));
207   - builder.setOneway(request.isOneway());
208   - builder.setExpTime(request.getExpirationTime());
209   - builder.setMethod(request.getBody().getMethod());
210   - builder.setParams(request.getBody().getParams());
211   -
212   - return builder.build();
213   - }
214   -
215   - private static ClusterAPIProtos.ToPluginRpcResponseRpcMessage toProtoMsg(ToPluginRpcResponseDeviceMsg msg) {
216   - ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
217   - FromDeviceRpcResponse request = msg.getResponse();
218   -
219   - builder.setMsgId(toUid(request.getId()));
220   - request.getResponse().ifPresent(builder::setResponse);
221   - request.getError().ifPresent(e -> builder.setError(e.name()));
222   -
223   - return builder.build();
224   - }
225   -
226   - private ClusterAPIProtos.ToAllNodesRpcMessage toProtoMsg(ToAllNodesMsg msg) {
227   - return ClusterAPIProtos.ToAllNodesRpcMessage.newBuilder().setData(
228   - ByteString.copyFrom(SerializationUtils.serialize(msg))
229   - ).build();
230   - }
231   -
232   -
233   - private ClusterAPIProtos.ToPluginRpcMessage toProtoMsg(PluginRpcMsg msg) {
234   - return ClusterAPIProtos.ToPluginRpcMessage.newBuilder()
235   - .setClazz(msg.getRpcMsg().getMsgClazz())
236   - .setData(ByteString.copyFrom(msg.getRpcMsg().getMsgData()))
237   - .setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
238   - .setTenantId(toUid(msg.getPluginTenantId().getId()))
239   - .setPluginId(toUid(msg.getPluginId().getId()))
240   - .build()
241   - ).build();
242   - }
243   -
244   - private static ClusterAPIProtos.Uid toUid(EntityId uuid) {
245   - return toUid(uuid.getId());
246   - }
247   -
248   - private static ClusterAPIProtos.Uid toUid(UUID uuid) {
249   - return ClusterAPIProtos.Uid.newBuilder().setPluginUuidMsb(uuid.getMostSignificantBits()).setPluginUuidLsb(
250   - uuid.getLeastSignificantBits()).build();
  137 + @Override
  138 + public void tell(ClusterAPIProtos.ClusterMessage message) {
  139 + listener.onSendMsg(message);
251 140 }
252 141
253   - private static ClusterAPIProtos.ToDeviceSessionActorRpcMessage toProtoMsg(ToDeviceSessionActorMsg msg) {
254   - return ClusterAPIProtos.ToDeviceSessionActorRpcMessage.newBuilder().setData(
255   - ByteString.copyFrom(SerializationUtils.serialize(msg))
256   - ).build();
257   - }
258 142
259 143 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.service.cluster.rpc;
17 17
18 18 import io.grpc.stub.StreamObserver;
  19 +import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
19 20 import org.thingsboard.server.common.msg.cluster.ServerAddress;
20 21 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
21 22 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
... ... @@ -35,20 +36,10 @@ public interface ClusterRpcService {
35 36
36 37 void init(RpcMsgListener listener);
37 38
38   - void tell(ServerAddress serverAddress, DeviceToDeviceActorMsg toForward);
  39 + void broadcast(RpcBroadcastMsg msg);
39 40
40   - void tell(ServerAddress serverAddress, ToDeviceSessionActorMsg toForward);
  41 + void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream);
41 42
42   - void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
43   -
44   - void tell(ServerAddress serverAddress, ToDeviceRpcRequestActorMsg toForward);
45   -
46   - void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
47   -
48   - void tell(PluginRpcMsg toForward);
49   -
50   - void broadcast(ToAllNodesMsg msg);
51   -
52   - void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
  43 + void tell(ClusterAPIProtos.ClusterMessage message);
53 44
54 45 }
... ...
... ... @@ -33,8 +33,8 @@ public final class GrpcSession implements Closeable {
33 33 private final UUID sessionId;
34 34 private final boolean client;
35 35 private final GrpcSessionListener listener;
36   - private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream;
37   - private StreamObserver<ClusterAPIProtos.ToRpcServerMessage> outputStream;
  36 + private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
  37 + private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
38 38
39 39 private boolean connected;
40 40 private ServerAddress remoteServer;
... ... @@ -56,17 +56,17 @@ public final class GrpcSession implements Closeable {
56 56 }
57 57
58 58 public void initInputStream() {
59   - this.inputStream = new StreamObserver<ClusterAPIProtos.ToRpcServerMessage>() {
  59 + this.inputStream = new StreamObserver<ClusterAPIProtos.ClusterMessage>() {
60 60 @Override
61   - public void onNext(ClusterAPIProtos.ToRpcServerMessage msg) {
62   - if (!connected && msg.hasConnectMsg()) {
  61 + public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) {
  62 + if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) {
63 63 connected = true;
64   - ClusterAPIProtos.ServerAddress rpcAddress = msg.getConnectMsg().getServerAddress();
  64 + ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAdresss().getHost(), clusterMessage.getServerAdresss().getPort());
65 65 remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort());
66 66 listener.onConnected(GrpcSession.this);
67 67 }
68 68 if (connected) {
69   - handleToRpcServerMessage(msg);
  69 + listener.onReceiveClusterGrpcMsg(GrpcSession.this, clusterMessage);
70 70 }
71 71 }
72 72
... ... @@ -83,37 +83,13 @@ public final class GrpcSession implements Closeable {
83 83 };
84 84 }
85 85
86   - private void handleToRpcServerMessage(ClusterAPIProtos.ToRpcServerMessage msg) {
87   - if (msg.hasToPluginRpcMsg()) {
88   - listener.onToPluginRpcMsg(GrpcSession.this, msg.getToPluginRpcMsg());
89   - }
90   - if (msg.hasToDeviceActorRpcMsg()) {
91   - listener.onToDeviceActorRpcMsg(GrpcSession.this, msg.getToDeviceActorRpcMsg());
92   - }
93   - if (msg.hasToDeviceSessionActorRpcMsg()) {
94   - listener.onToDeviceSessionActorRpcMsg(GrpcSession.this, msg.getToDeviceSessionActorRpcMsg());
95   - }
96   - if (msg.hasToDeviceActorNotificationRpcMsg()) {
97   - listener.onToDeviceActorNotificationRpcMsg(GrpcSession.this, msg.getToDeviceActorNotificationRpcMsg());
98   - }
99   - if (msg.hasToDeviceRpcRequestRpcMsg()) {
100   - listener.onToDeviceRpcRequestRpcMsg(GrpcSession.this, msg.getToDeviceRpcRequestRpcMsg());
101   - }
102   - if (msg.hasToPluginRpcResponseRpcMsg()) {
103   - listener.onFromDeviceRpcResponseRpcMsg(GrpcSession.this, msg.getToPluginRpcResponseRpcMsg());
104   - }
105   - if (msg.hasToAllNodesRpcMsg()) {
106   - listener.onToAllNodesRpcMessage(GrpcSession.this, msg.getToAllNodesRpcMsg());
107   - }
108   - }
109   -
110 86 public void initOutputStream() {
111 87 if (client) {
112 88 listener.onConnected(GrpcSession.this);
113 89 }
114 90 }
115 91
116   - public void sendMsg(ClusterAPIProtos.ToRpcServerMessage msg) {
  92 + public void sendMsg(ClusterAPIProtos.ClusterMessage msg) {
117 93 outputStream.onNext(msg);
118 94 }
119 95
... ...
... ... @@ -26,20 +26,7 @@ public interface GrpcSessionListener {
26 26
27 27 void onDisconnected(GrpcSession session);
28 28
29   - void onToPluginRpcMsg(GrpcSession session, ClusterAPIProtos.ToPluginRpcMessage msg);
30   -
31   - void onToDeviceActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceActorRpcMessage msg);
32   -
33   - void onToDeviceActorNotificationRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceActorNotificationRpcMessage msg);
34   -
35   - void onToDeviceSessionActorRpcMsg(GrpcSession session, ClusterAPIProtos.ToDeviceSessionActorRpcMessage msg);
36   -
37   - void onToAllNodesRpcMessage(GrpcSession grpcSession, ClusterAPIProtos.ToAllNodesRpcMessage toAllNodesRpcMessage);
38   -
39   - void onToDeviceRpcRequestRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg);
40   -
41   - void onFromDeviceRpcResponseRpcMsg(GrpcSession grpcSession, ClusterAPIProtos.ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg);
  29 + void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage);
42 30
43 31 void onError(GrpcSession session, Throwable t);
44   -
45 32 }
... ...
... ... @@ -17,32 +17,15 @@ package org.thingsboard.server.service.cluster.rpc;
17 17
18 18 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
19 19 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
20   -import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
21   -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
22   -import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
23   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
24   -import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
25   -import org.thingsboard.server.extensions.api.plugins.msg.ToPluginActorMsg;
  20 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
26 21
27 22 /**
28 23 * @author Andrew Shvayka
29 24 */
30   -public interface RpcMsgListener {
31   -
32   - void onMsg(DeviceToDeviceActorMsg msg);
33   -
34   - void onMsg(ToDeviceActorNotificationMsg msg);
35   -
36   - void onMsg(ToDeviceSessionActorMsg msg);
37   -
38   - void onMsg(ToAllNodesMsg nodeMsg);
39   -
40   - void onMsg(ToPluginActorMsg msg);
41   -
42   - void onMsg(RpcSessionCreateRequestMsg msg);
43   -
44   - void onMsg(RpcSessionTellMsg rpcSessionTellMsg);
45   -
46   - void onMsg(RpcBroadcastMsg rpcBroadcastMsg);
47 25
  26 +public interface RpcMsgListener {
  27 + void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg);
  28 + void onSendMsg(ClusterAPIProtos.ClusterMessage msg);
  29 + void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg);
  30 + void onBroadcastMsg(RpcBroadcastMsg msg);
48 31 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.encoding;
  17 +
  18 +import org.thingsboard.server.common.msg.TbActorMsg;
  19 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  20 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  21 +
  22 +import java.util.Optional;
  23 +
  24 +public interface DataDecodingEncodingService {
  25 +
  26 + Optional<TbActorMsg> decode(byte[] byteArray);
  27 +
  28 + byte[] encode(TbActorMsg msq);
  29 +
  30 + ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
  31 + TbActorMsg msg);
  32 +
  33 +}
  34 +
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.encoding;
  17 +
  18 +import com.google.protobuf.ByteString;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.stereotype.Service;
  21 +import org.springframework.util.SerializationUtils;
  22 +import org.thingsboard.server.common.msg.TbActorMsg;
  23 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  24 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  25 +
  26 +import java.util.Optional;
  27 +
  28 +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE;
  29 +
  30 +
  31 +@Slf4j
  32 +@Service
  33 +public class ProtoWithJavaSerializationDecodingEncodingService implements DataDecodingEncodingService {
  34 +
  35 +
  36 + @Override
  37 + public Optional<TbActorMsg> decode(byte[] byteArray) {
  38 + try {
  39 + TbActorMsg msg = (TbActorMsg) SerializationUtils.deserialize(byteArray);
  40 + return Optional.of(msg);
  41 +
  42 + } catch (IllegalArgumentException e) {
  43 + log.error("Error during deserialization message, [{}]", e.getMessage());
  44 + return Optional.empty();
  45 + }
  46 + }
  47 +
  48 + @Override
  49 + public byte[] encode(TbActorMsg msq) {
  50 + return SerializationUtils.serialize(msq);
  51 + }
  52 +
  53 + @Override
  54 + public ClusterAPIProtos.ClusterMessage convertToProtoDataMessage(ServerAddress serverAddress,
  55 + TbActorMsg msg) {
  56 + return ClusterAPIProtos.ClusterMessage
  57 + .newBuilder()
  58 + .setServerAdresss(ClusterAPIProtos.ServerAddress
  59 + .newBuilder()
  60 + .setHost(serverAddress.getHost())
  61 + .setPort(serverAddress.getPort())
  62 + .build())
  63 + .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE)
  64 + .setPayload(ByteString.copyFrom(encode(msg))).build();
  65 +
  66 + }
  67 +}
... ...
... ... @@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.id.TenantId;
31 31 import org.thingsboard.server.common.data.id.UUIDBased;
32 32 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  33 +import org.thingsboard.server.common.msg.TbActorMsg;
  34 +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
33 35 import org.thingsboard.server.common.msg.cluster.ServerAddress;
34 36 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
35 37 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
... ... @@ -38,6 +40,7 @@ import org.thingsboard.server.dao.audit.AuditLogService;
38 40 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
39 41 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
40 42 import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
  43 +import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
41 44 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
42 45 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
43 46 import org.thingsboard.server.service.security.model.SecurityUser;
... ... @@ -135,23 +138,16 @@ public class DefaultDeviceRpcService implements DeviceRpcService {
135 138 @Override
136 139 public void sendRpcReplyToDevice(TenantId tenantId, DeviceId deviceId, int requestId, String body) {
137 140 ToServerRpcResponseActorMsg rpcMsg = new ToServerRpcResponseActorMsg(tenantId, deviceId, new ToServerRpcResponseMsg(requestId, body));
138   - forward(deviceId, rpcMsg, rpcService::tell);
  141 + forward(deviceId, rpcMsg);
139 142 }
140 143
141 144 private void sendRpcRequest(ToDeviceRpcRequest msg) {
142 145 log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
143 146 ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(msg);
144   - forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
  147 + forward(msg.getDeviceId(), rpcMsg);
145 148 }
146 149
147   - private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg, BiConsumer<ServerAddress, T> rpcFunction) {
148   - Optional<ServerAddress> instance = routingService.resolveById(deviceId);
149   - if (instance.isPresent()) {
150   - log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
151   - rpcFunction.accept(instance.get(), msg);
152   - } else {
153   - log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
154   - actorService.onMsg(msg);
155   - }
  150 + private <T extends ToDeviceActorNotificationMsg> void forward(DeviceId deviceId, T msg) {
  151 + actorService.onMsg(new SendToClusterMsg(deviceId, msg));
156 152 }
157 153 }
... ...
... ... @@ -19,79 +19,73 @@ package cluster;
19 19 option java_package = "org.thingsboard.server.gen.cluster";
20 20 option java_outer_classname = "ClusterAPIProtos";
21 21
22   -message ServerAddress {
23   - string host = 1;
24   - int32 port = 2;
25   -}
26   -
27   -message Uid {
28   - sint64 pluginUuidMsb = 1;
29   - sint64 pluginUuidLsb = 2;
30   -}
31   -
32   -message PluginAddress {
33   - Uid pluginId = 1;
34   - Uid tenantId = 2;
35   -}
36   -
37   -message ToPluginRpcMessage {
38   - PluginAddress address = 1;
39   - int32 clazz = 2;
40   - bytes data = 3;
41   -}
42   -
43   -message ToDeviceActorRpcMessage {
44   - bytes data = 1;
45   -}
46   -
47   -message ToDeviceSessionActorRpcMessage {
48   - bytes data = 1;
49   -}
50   -
51   -message ToDeviceActorNotificationRpcMessage {
52   - bytes data = 1;
53   -}
54   -
55   -message ToAllNodesRpcMessage {
56   - bytes data = 1;
57   -}
58   -
59   -message ConnectRpcMessage {
60   - ServerAddress serverAddress = 1;
61   -}
62   -
63   -message ToDeviceRpcRequestRpcMessage {
64   - Uid deviceTenantId = 2;
65   - Uid deviceId = 3;
66   -
67   - Uid msgId = 4;
68   - bool oneway = 5;
69   - int64 expTime = 6;
70   - string method = 7;
71   - string params = 8;
72   -}
73   -
74   -message ToPluginRpcResponseRpcMessage {
75   - Uid msgId = 2;
76   - string response = 3;
77   - string error = 4;
78   -}
79   -
80   -message ToRpcServerMessage {
81   - ConnectRpcMessage connectMsg = 1;
82   - ToPluginRpcMessage toPluginRpcMsg = 2;
83   - ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
84   - ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
85   - ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
86   - ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
87   - ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
88   - ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
89   -}
  22 +//message Uid {
  23 +// sint64 pluginUuidMsb = 1;
  24 +// sint64 pluginUuidLsb = 2;
  25 +//}
  26 +//
  27 +//message PluginAddress {
  28 +// Uid pluginId = 1;
  29 +// Uid tenantId = 2;
  30 +//}
  31 +//
  32 +//message ToPluginRpcMessage {
  33 +// PluginAddress address = 1;
  34 +// int32 clazz = 2;
  35 +// bytes data = 3;
  36 +//}
  37 +//
  38 +//message ToDeviceActorRpcMessage {
  39 +// bytes data = 1;
  40 +//}
  41 +//
  42 +//message ToDeviceSessionActorRpcMessage {
  43 +// bytes data = 1;
  44 +//}
  45 +//
  46 +//message ToDeviceActorNotificationRpcMessage {
  47 +// bytes data = 1;
  48 +//}
  49 +//
  50 +//message ToAllNodesRpcMessage {
  51 +// bytes data = 1;
  52 +//}
  53 +//
  54 +//message ConnectRpcMessage {
  55 +// ServerAddress serverAddress = 1;
  56 +//}
  57 +//
  58 +//message ToDeviceRpcRequestRpcMessage {
  59 +// Uid deviceTenantId = 2;
  60 +// Uid deviceId = 3;
  61 +//
  62 +// Uid msgId = 4;
  63 +// bool oneway = 5;
  64 +// int64 expTime = 6;
  65 +// string method = 7;
  66 +// string params = 8;
  67 +//}
  68 +//
  69 +//message ToPluginRpcResponseRpcMessage {
  70 +// Uid msgId = 2;
  71 +// string response = 3;
  72 +// string error = 4;
  73 +//}
  74 +//
  75 +//message ToRpcServerMessage {
  76 +// ConnectRpcMessage connectMsg = 1;
  77 +// ToPluginRpcMessage toPluginRpcMsg = 2;
  78 +// ToDeviceActorRpcMessage toDeviceActorRpcMsg = 3;
  79 +// ToDeviceSessionActorRpcMessage toDeviceSessionActorRpcMsg = 4;
  80 +// ToDeviceActorNotificationRpcMessage toDeviceActorNotificationRpcMsg = 5;
  81 +// ToAllNodesRpcMessage toAllNodesRpcMsg = 6;
  82 +// ToDeviceRpcRequestRpcMessage toDeviceRpcRequestRpcMsg = 7;
  83 +// ToPluginRpcResponseRpcMessage toPluginRpcResponseRpcMsg = 8;
  84 +//}
90 85
91 86 service ClusterRpcService {
92   - rpc handlePluginMsgs(stream ToRpcServerMessage) returns (stream ToRpcServerMessage) {}
  87 + rpc handleMsgs(stream ClusterMessage) returns (stream ClusterMessage) {}
93 88 }
94   -
95 89 message ClusterMessage {
96 90 MessageType messageType = 1;
97 91 MessageMataInfo messageMetaInfo = 2;
... ...
... ... @@ -29,6 +29,11 @@ public enum MsgType {
29 29 CLUSTER_EVENT_MSG,
30 30
31 31 /**
  32 + * All messages, could be send to cluster
  33 + */
  34 + SEND_TO_CLUSTER_MSG,
  35 +
  36 + /**
32 37 * ADDED/UPDATED/DELETED events for main entities.
33 38 *
34 39 * See {@link org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.cluster;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.id.DeviceId;
  20 +import org.thingsboard.server.common.data.id.EntityId;
  21 +import org.thingsboard.server.common.msg.MsgType;
  22 +import org.thingsboard.server.common.msg.TbActorMsg;
  23 +
  24 +@Data
  25 +public class SendToClusterMsg implements TbActorMsg {
  26 +
  27 + private TbActorMsg msg;
  28 + private EntityId entityId;
  29 +
  30 + public SendToClusterMsg(EntityId entityId, TbActorMsg msg) {
  31 + this.entityId = entityId;
  32 + this.msg = msg;
  33 + }
  34 +
  35 +
  36 + @Override
  37 + public MsgType getMsgType() {
  38 + return MsgType.SEND_TO_CLUSTER_MSG;
  39 + }
  40 +}
... ...
... ... @@ -15,10 +15,12 @@
15 15 */
16 16 package org.thingsboard.server.common.msg.cluster;
17 17
  18 +import org.thingsboard.server.common.msg.TbActorMsg;
  19 +
18 20 import java.io.Serializable;
19 21
20 22 /**
21 23 * @author Andrew Shvayka
22 24 */
23   -public interface ToAllNodesMsg extends Serializable {
  25 +public interface ToAllNodesMsg extends Serializable, TbActorMsg {
24 26 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.common.msg.core;
17 17
18 18 import org.thingsboard.server.common.data.id.SessionId;
  19 +import org.thingsboard.server.common.msg.MsgType;
19 20 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
20 21
21 22 public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
... ... @@ -44,4 +45,8 @@ public class BasicToDeviceSessionActorMsg implements ToDeviceSessionActorMsg {
44 45 return "BasicToSessionResponseMsg [msg=" + msg + ", sessionId=" + sessionId + "]";
45 46 }
46 47
  48 + @Override
  49 + public MsgType getMsgType() {
  50 + return null;
  51 + }
47 52 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.common.msg.core;
17 17
  18 +import org.thingsboard.server.common.msg.TbActorMsg;
18 19 import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
19 20 import org.thingsboard.server.common.msg.session.ToDeviceMsg;
20 21
... ... @@ -23,7 +24,7 @@ import java.io.Serializable;
23 24 /**
24 25 * @author Andrew Shvayka
25 26 */
26   -public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable {
  27 +public interface ToDeviceSessionActorMsg extends SessionAwareMsg, Serializable, TbActorMsg {
27 28
28 29 ToDeviceMsg getMsg();
29 30 }
... ...
... ... @@ -20,7 +20,6 @@ import lombok.ToString;
20 20 import org.thingsboard.server.common.data.EntityType;
21 21 import org.thingsboard.server.common.data.id.*;
22 22 import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
23   -import org.thingsboard.server.common.data.rule.RuleChain;
24 23 import org.thingsboard.server.common.msg.MsgType;
25 24 import org.thingsboard.server.common.msg.TbActorMsg;
26 25 import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
... ...