Commit 2d063a9c8b90aef76eb529dd9bc0300e02a01205

Authored by Andrew Shvayka
1 parent 7563e030

Improved GRPC callbacks

... ... @@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
61 61 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
62 62 import org.thingsboard.server.service.component.ComponentDiscoveryService;
63 63 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
  64 +import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
64 65 import org.thingsboard.server.service.executors.DbCallbackExecutorService;
65 66 import org.thingsboard.server.service.executors.ExternalCallExecutorService;
66 67 import org.thingsboard.server.service.mail.MailExecutorService;
... ... @@ -188,6 +189,10 @@ public class ActorSystemContext {
188 189
189 190 @Autowired
190 191 @Getter
  192 + private ClusterRpcCallbackExecutorService clusterRpcCallbackExecutor;
  193 +
  194 + @Autowired
  195 + @Getter
191 196 private DbCallbackExecutorService dbCallbackExecutor;
192 197
193 198 @Autowired
... ...
... ... @@ -17,10 +17,12 @@ package org.thingsboard.server.actors.rpc;
17 17
18 18 import akka.actor.ActorRef;
19 19 import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.actors.ActorSystemContext;
20 21 import org.thingsboard.server.actors.service.ActorService;
21 22 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
22 23 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
23 24 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
  25 +import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService;
24 26
25 27 /**
26 28 * @author Andrew Shvayka
... ... @@ -28,12 +30,14 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
28 30 @Slf4j
29 31 public class BasicRpcSessionListener implements GrpcSessionListener {
30 32
  33 + private final ClusterRpcCallbackExecutorService callbackExecutorService;
31 34 private final ActorService service;
32 35 private final ActorRef manager;
33 36 private final ActorRef self;
34 37
35   - BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
36   - this.service = service;
  38 + BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) {
  39 + this.service = context.getActorService();
  40 + this.callbackExecutorService = context.getClusterRpcCallbackExecutor();
37 41 this.manager = manager;
38 42 this.self = self;
39 43 }
... ... @@ -55,7 +59,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
55 59 @Override
56 60 public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) {
57 61 log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage);
58   - service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
  62 + callbackExecutorService.execute(() -> {
  63 + try {
  64 + service.onReceivedMsg(session.getRemoteServer(), clusterMessage);
  65 + } catch (Exception e) {
  66 + log.debug("[{}][{}] Failed to process cluster message: {}", session.getRemoteServer(), getType(session), clusterMessage, e);
  67 + }
  68 + });
59 69 }
60 70
61 71 @Override
... ...
... ... @@ -16,7 +16,9 @@
16 16 package org.thingsboard.server.actors.rpc;
17 17
18 18 import akka.actor.ActorRef;
  19 +import akka.actor.OneForOneStrategy;
19 20 import akka.actor.Props;
  21 +import akka.actor.SupervisorStrategy;
20 22 import akka.event.Logging;
21 23 import akka.event.LoggingAdapter;
22 24 import lombok.extern.slf4j.Slf4j;
... ... @@ -30,6 +32,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
30 32 import org.thingsboard.server.common.msg.cluster.ServerType;
31 33 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
32 34 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
  35 +import scala.concurrent.duration.Duration;
33 36
34 37 import java.util.*;
35 38
... ... @@ -39,9 +42,7 @@ import java.util.*;
39 42 public class RpcManagerActor extends ContextAwareActor {
40 43
41 44 private final Map<ServerAddress, SessionActorInfo> sessionActors;
42   -
43 45 private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
44   -
45 46 private final ServerAddress instance;
46 47
47 48 private RpcManagerActor(ActorSystemContext systemContext) {
... ... @@ -63,7 +64,7 @@ public class RpcManagerActor extends ContextAwareActor {
63 64 }
64 65
65 66 @Override
66   - public void onReceive(Object msg) throws Exception {
  67 + public void onReceive(Object msg) {
67 68 if (msg instanceof ClusterAPIProtos.ClusterMessage) {
68 69 onMsg((ClusterAPIProtos.ClusterMessage) msg);
69 70 } else if (msg instanceof RpcBroadcastMsg) {
... ... @@ -163,6 +164,7 @@ public class RpcManagerActor extends ContextAwareActor {
163 164 log.info("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect);
164 165 SessionActorInfo sessionRef = sessionActors.get(remoteAddress);
165 166 if (sessionRef != null && context().sender() != null && context().sender().equals(sessionRef.actor)) {
  167 + context().stop(sessionRef.actor);
166 168 sessionActors.remove(remoteAddress);
167 169 pendingMsgs.remove(remoteAddress);
168 170 if (reconnect) {
... ... @@ -172,9 +174,13 @@ public class RpcManagerActor extends ContextAwareActor {
172 174 }
173 175
174 176 private void onCreateSessionRequest(RpcSessionCreateRequestMsg msg) {
175   - ActorRef actorRef = createSessionActor(msg);
176 177 if (msg.getRemoteAddress() != null) {
177   - register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
  178 + if (!sessionActors.containsKey(msg.getRemoteAddress())) {
  179 + ActorRef actorRef = createSessionActor(msg);
  180 + register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef);
  181 + }
  182 + } else {
  183 + createSessionActor(msg);
178 184 }
179 185 }
180 186
... ... @@ -193,7 +199,8 @@ public class RpcManagerActor extends ContextAwareActor {
193 199 private ActorRef createSessionActor(RpcSessionCreateRequestMsg msg) {
194 200 log.info("[{}] Creating session actor.", msg.getMsgUid());
195 201 ActorRef actor = context().actorOf(
196   - Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
  202 + Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid()))
  203 + .withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
197 204 actor.tell(msg, context().self());
198 205 return actor;
199 206 }
... ... @@ -210,4 +217,14 @@ public class RpcManagerActor extends ContextAwareActor {
210 217 return new RpcManagerActor(context);
211 218 }
212 219 }
  220 +
  221 + @Override
  222 + public SupervisorStrategy supervisorStrategy() {
  223 + return strategy;
  224 + }
  225 +
  226 + private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> {
  227 + log.warn("Unknown failure", t);
  228 + return SupervisorStrategy.resume();
  229 + });
213 230 }
... ...
... ... @@ -15,12 +15,10 @@
15 15 */
16 16 package org.thingsboard.server.actors.rpc;
17 17
18   -import akka.event.Logging;
19   -import akka.event.LoggingAdapter;
20   -import io.grpc.Channel;
21 18 import io.grpc.ManagedChannel;
22 19 import io.grpc.ManagedChannelBuilder;
23 20 import io.grpc.stub.StreamObserver;
  21 +import lombok.extern.slf4j.Slf4j;
24 22 import org.thingsboard.server.actors.ActorSystemContext;
25 23 import org.thingsboard.server.actors.service.ContextAwareActor;
26 24 import org.thingsboard.server.actors.service.ContextBasedCreator;
... ... @@ -38,15 +36,15 @@ import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CO
38 36 /**
39 37 * @author Andrew Shvayka
40 38 */
  39 +@Slf4j
41 40 public class RpcSessionActor extends ContextAwareActor {
42 41
43   - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
44 42
45 43 private final UUID sessionId;
46 44 private GrpcSession session;
47 45 private GrpcSessionListener listener;
48 46
49   - public RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
  47 + private RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) {
50 48 super(systemContext);
51 49 this.sessionId = sessionId;
52 50 }
... ... @@ -58,7 +56,7 @@ public class RpcSessionActor extends ContextAwareActor {
58 56 }
59 57
60 58 @Override
61   - public void onReceive(Object msg) throws Exception {
  59 + public void onReceive(Object msg) {
62 60 if (msg instanceof ClusterAPIProtos.ClusterMessage) {
63 61 tell((ClusterAPIProtos.ClusterMessage) msg);
64 62 } else if (msg instanceof RpcSessionCreateRequestMsg) {
... ... @@ -67,19 +65,29 @@ public class RpcSessionActor extends ContextAwareActor {
67 65 }
68 66
69 67 private void tell(ClusterAPIProtos.ClusterMessage msg) {
70   - session.sendMsg(msg);
  68 + if (session != null) {
  69 + session.sendMsg(msg);
  70 + } else {
  71 + log.trace("Failed to send message due to missing session!");
  72 + }
71 73 }
72 74
73 75 @Override
74 76 public void postStop() {
75   - log.info("Closing session -> {}", session.getRemoteServer());
76   - session.close();
  77 + if (session != null) {
  78 + log.info("Closing session -> {}", session.getRemoteServer());
  79 + try {
  80 + session.close();
  81 + } catch (RuntimeException e) {
  82 + log.trace("Failed to close session!", e);
  83 + }
  84 + }
77 85 }
78 86
79 87 private void initSession(RpcSessionCreateRequestMsg msg) {
80 88 log.info("[{}] Initializing session", context().self());
81 89 ServerAddress remoteServer = msg.getRemoteAddress();
82   - listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self());
  90 + listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self());
83 91 if (msg.getRemoteAddress() == null) {
84 92 // Server session
85 93 session = new GrpcSession(listener);
... ... @@ -113,7 +121,7 @@ public class RpcSessionActor extends ContextAwareActor {
113 121 }
114 122
115 123 @Override
116   - public RpcSessionActor create() throws Exception {
  124 + public RpcSessionActor create() {
117 125 return new RpcSessionActor(context, sessionId);
118 126 }
119 127 }
... ...
... ... @@ -37,7 +37,7 @@ public abstract class ContextAwareActor extends UntypedActor {
37 37 }
38 38
39 39 @Override
40   - public void onReceive(Object msg) throws Exception {
  40 + public void onReceive(Object msg) {
41 41 if (log.isDebugEnabled()) {
42 42 log.debug("Processing msg: {}", msg);
43 43 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.executors;
  17 +
  18 +import org.springframework.beans.factory.annotation.Value;
  19 +import org.springframework.stereotype.Component;
  20 +
  21 +@Component
  22 +public class ClusterRpcCallbackExecutorService extends AbstractListeningExecutor {
  23 +
  24 + @Value("${actors.cluster.grpc_callback_thread_pool_size}")
  25 + private int grpcCallbackExecutorThreadPoolSize;
  26 +
  27 + @Override
  28 + protected int getThreadPollSize() {
  29 + return grpcCallbackExecutorThreadPoolSize;
  30 + }
  31 +
  32 +}
... ...
... ... @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
23 23 import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
24 24
25 25 import java.util.Collections;
  26 +import java.util.UUID;
26 27
27 28 import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
28 29
... ... @@ -47,4 +48,10 @@ public class DefaultDeviceSessionCacheService implements DeviceSessionCacheServi
47 48 return sessions;
48 49 }
49 50
  51 + public static void main (String[] args){
  52 + UUID uuid = UUID.fromString("d5db434e-9cd2-4903-8b3b-421b2d93664d");
  53 + System.out.println(uuid.getMostSignificantBits());
  54 + System.out.println(uuid.getLeastSignificantBits());
  55 + }
  56 +
50 57 }
... ...
... ... @@ -153,6 +153,8 @@ sql:
153 153
154 154 # Actor system parameters
155 155 actors:
  156 + cluster:
  157 + grpc_callback_thread_pool_size: "${ACTORS_CLUSTER_GRPC_CALLBACK_THREAD_POOL_SIZE:10}"
156 158 tenant:
157 159 create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}"
158 160 session:
... ...