Commit ef164467bf7981b25f876bab1c6a78c5f8d3d277

Authored by Igor Kulikov
2 parents cc873d4f 4255bb56

Merge branch 'master' of github.com:thingsboard/thingsboard

@@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -201,7 +201,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
201 201
202 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) { 202 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
203 return entry -> { 203 return entry -> {
204 - ToDeviceRpcRequestActorMsg requestActorMsg = entry.getValue().getMsg();  
205 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); 204 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
206 ToDeviceRpcRequestBody body = request.getBody(); 205 ToDeviceRpcRequestBody body = request.getBody();
207 if (request.isOneway()) { 206 if (request.isOneway()) {
@@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -486,6 +485,12 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
486 sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime()); 485 sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
487 sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription()); 486 sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
488 sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription()); 487 sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
  488 + if (subscriptionInfo.getAttributeSubscription()) {
  489 + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  490 + }
  491 + if (subscriptionInfo.getRpcSubscription()) {
  492 + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  493 + }
489 } 494 }
490 dumpSessions(); 495 dumpSessions();
491 } 496 }
@@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -618,8 +623,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
618 } 623 }
619 624
620 private void restoreSessions() { 625 private void restoreSessions() {
  626 + logger.debug("[{}] Restoring sessions from cache", deviceId);
621 TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId); 627 TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
622 if (sessionsDump.getSerializedSize() == 0) { 628 if (sessionsDump.getSerializedSize() == 0) {
  629 + logger.debug("[{}] No session information found", deviceId);
623 return; 630 return;
624 } 631 }
625 for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) { 632 for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
@@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -627,18 +634,23 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
627 UUID sessionId = getSessionId(sessionInfoProto); 634 UUID sessionId = getSessionId(sessionInfoProto);
628 SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()); 635 SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
629 TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo(); 636 TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
630 - SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());  
631 - sessions.put(sessionId, sessionInfoMetaData);  
632 - if (subInfo.getAttributeSubscription()) {  
633 - rpcSubscriptions.put(sessionId, sessionInfo);  
634 - } 637 + SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
  638 + sessions.put(sessionId, sessionMD);
635 if (subInfo.getAttributeSubscription()) { 639 if (subInfo.getAttributeSubscription()) {
636 attributeSubscriptions.put(sessionId, sessionInfo); 640 attributeSubscriptions.put(sessionId, sessionInfo);
  641 + sessionMD.setSubscribedToAttributes(true);
  642 + }
  643 + if (subInfo.getRpcSubscription()) {
  644 + rpcSubscriptions.put(sessionId, sessionInfo);
  645 + sessionMD.setSubscribedToRPC(true);
637 } 646 }
  647 + logger.debug("[{}] Restored session: {}", deviceId, sessionMD);
638 } 648 }
  649 + logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
639 } 650 }
640 651
641 private void dumpSessions() { 652 private void dumpSessions() {
  653 + logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
642 List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size()); 654 List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
643 sessions.forEach((uuid, sessionMD) -> { 655 sessions.forEach((uuid, sessionMD) -> {
644 if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) { 656 if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
@@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -656,6 +668,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
656 sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder() 668 sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
657 .setSessionInfo(sessionInfoProto) 669 .setSessionInfo(sessionInfoProto)
658 .setSubscriptionInfo(subscriptionInfoProto).build()); 670 .setSubscriptionInfo(subscriptionInfoProto).build());
  671 + logger.debug("[{}] Dumping session: {}", deviceId, sessionMD);
659 }); 672 });
660 systemContext.getDeviceSessionCacheService() 673 systemContext.getDeviceSessionCacheService()
661 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder() 674 .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
@@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener { @@ -32,7 +32,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
32 private final ActorRef manager; 32 private final ActorRef manager;
33 private final ActorRef self; 33 private final ActorRef self;
34 34
35 - public BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) { 35 + BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) {
36 this.service = service; 36 this.service = service;
37 this.manager = manager; 37 this.manager = manager;
38 this.self = self; 38 this.self = self;
@@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor { @@ -103,10 +103,10 @@ public class RpcManagerActor extends ContextAwareActor {
103 ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE); 103 ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort(), ServerType.CORE);
104 SessionActorInfo session = sessionActors.get(address); 104 SessionActorInfo session = sessionActors.get(address);
105 if (session != null) { 105 if (session != null) {
106 - log.debug("{} Forwarding msg to session actor", address); 106 + log.debug("{} Forwarding msg to session actor: {}", address, msg);
107 session.getActor().tell(msg, ActorRef.noSender()); 107 session.getActor().tell(msg, ActorRef.noSender());
108 } else { 108 } else {
109 - log.debug("{} Storing msg to pending queue", address); 109 + log.debug("{} Storing msg to pending queue: {}", address, msg);
110 Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address); 110 Queue<ClusterAPIProtos.ClusterMessage> queue = pendingMsgs.get(address);
111 if (queue == null) { 111 if (queue == null) {
112 queue = new LinkedList<>(); 112 queue = new LinkedList<>();
@@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor { @@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor {
116 queue.add(msg); 116 queue.add(msg);
117 } 117 }
118 } else { 118 } else {
119 - logger.warning("Cluster msg doesn't have set Server Address [{}]", msg); 119 + logger.warning("Cluster msg doesn't have server address [{}]", msg);
120 } 120 }
121 } 121 }
122 122
@@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc; @@ -18,6 +18,7 @@ package org.thingsboard.server.actors.rpc;
18 import akka.event.Logging; 18 import akka.event.Logging;
19 import akka.event.LoggingAdapter; 19 import akka.event.LoggingAdapter;
20 import io.grpc.Channel; 20 import io.grpc.Channel;
  21 +import io.grpc.ManagedChannel;
21 import io.grpc.ManagedChannelBuilder; 22 import io.grpc.ManagedChannelBuilder;
22 import io.grpc.stub.StreamObserver; 23 import io.grpc.stub.StreamObserver;
23 import org.thingsboard.server.actors.ActorSystemContext; 24 import org.thingsboard.server.actors.ActorSystemContext;
@@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor { @@ -88,8 +89,8 @@ public class RpcSessionActor extends ContextAwareActor {
88 systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream()); 89 systemContext.getRpcService().onSessionCreated(msg.getMsgUid(), session.getInputStream());
89 } else { 90 } else {
90 // Client session 91 // Client session
91 - Channel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext(true).build();  
92 - session = new GrpcSession(remoteServer, listener); 92 + ManagedChannel channel = ManagedChannelBuilder.forAddress(remoteServer.getHost(), remoteServer.getPort()).usePlaintext().build();
  93 + session = new GrpcSession(remoteServer, listener, channel);
93 session.initInputStream(); 94 session.initInputStream();
94 95
95 ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel); 96 ClusterRpcServiceGrpc.ClusterRpcServiceStub stub = ClusterRpcServiceGrpc.newStub(channel);
@@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor { @@ -108,7 +108,7 @@ public class TenantActor extends RuleChainManagerActor {
108 @Override 108 @Override
109 protected void broadcast(Object msg) { 109 protected void broadcast(Object msg) {
110 super.broadcast(msg); 110 super.broadcast(msg);
111 - deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); 111 +// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
112 } 112 }
113 113
114 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { 114 private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
@@ -15,6 +15,8 @@ @@ -15,6 +15,8 @@
15 */ 15 */
16 package org.thingsboard.server.service.cluster.rpc; 16 package org.thingsboard.server.service.cluster.rpc;
17 17
  18 +import io.grpc.Channel;
  19 +import io.grpc.ManagedChannel;
18 import io.grpc.stub.StreamObserver; 20 import io.grpc.stub.StreamObserver;
19 import lombok.Data; 21 import lombok.Data;
20 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
@@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable { @@ -34,6 +36,7 @@ public final class GrpcSession implements Closeable {
34 private final UUID sessionId; 36 private final UUID sessionId;
35 private final boolean client; 37 private final boolean client;
36 private final GrpcSessionListener listener; 38 private final GrpcSessionListener listener;
  39 + private final ManagedChannel channel;
37 private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream; 40 private StreamObserver<ClusterAPIProtos.ClusterMessage> inputStream;
38 private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream; 41 private StreamObserver<ClusterAPIProtos.ClusterMessage> outputStream;
39 42
@@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable { @@ -41,10 +44,10 @@ public final class GrpcSession implements Closeable {
41 private ServerAddress remoteServer; 44 private ServerAddress remoteServer;
42 45
43 public GrpcSession(GrpcSessionListener listener) { 46 public GrpcSession(GrpcSessionListener listener) {
44 - this(null, listener); 47 + this(null, listener, null);
45 } 48 }
46 49
47 - public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener) { 50 + public GrpcSession(ServerAddress remoteServer, GrpcSessionListener listener, ManagedChannel channel) {
48 this.sessionId = UUID.randomUUID(); 51 this.sessionId = UUID.randomUUID();
49 this.listener = listener; 52 this.listener = listener;
50 if (remoteServer != null) { 53 if (remoteServer != null) {
@@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable { @@ -54,6 +57,7 @@ public final class GrpcSession implements Closeable {
54 } else { 57 } else {
55 this.client = false; 58 this.client = false;
56 } 59 }
  60 + this.channel = channel;
57 } 61 }
58 62
59 public void initInputStream() { 63 public void initInputStream() {
@@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable { @@ -105,5 +109,8 @@ public final class GrpcSession implements Closeable {
105 } catch (IllegalStateException e) { 109 } catch (IllegalStateException e) {
106 log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage()); 110 log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
107 } 111 }
  112 + if (channel != null) {
  113 + channel.shutdownNow();
  114 + }
108 } 115 }
109 } 116 }
@@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration; @@ -30,7 +30,6 @@ import org.thingsboard.rule.engine.api.NodeConfiguration;
30 import org.thingsboard.rule.engine.api.NodeDefinition; 30 import org.thingsboard.rule.engine.api.NodeDefinition;
31 import org.thingsboard.rule.engine.api.RuleNode; 31 import org.thingsboard.rule.engine.api.RuleNode;
32 import org.thingsboard.rule.engine.api.TbRelationTypes; 32 import org.thingsboard.rule.engine.api.TbRelationTypes;
33 -import org.thingsboard.server.common.data.DataConstants;  
34 import org.thingsboard.server.common.data.plugin.ComponentDescriptor; 33 import org.thingsboard.server.common.data.plugin.ComponentDescriptor;
35 import org.thingsboard.server.common.data.plugin.ComponentType; 34 import org.thingsboard.server.common.data.plugin.ComponentType;
36 import org.thingsboard.server.dao.component.ComponentDescriptorService; 35 import org.thingsboard.server.dao.component.ComponentDescriptorService;
@@ -52,6 +51,7 @@ import java.util.Set; @@ -52,6 +51,7 @@ import java.util.Set;
52 @Slf4j 51 @Slf4j
53 public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService { 52 public class AnnotationComponentDiscoveryService implements ComponentDiscoveryService {
54 53
  54 + public static final int MAX_OPTIMISITC_RETRIES = 3;
55 @Value("${plugins.scan_packages}") 55 @Value("${plugins.scan_packages}")
56 private String[] scanPackages; 56 private String[] scanPackages;
57 57
@@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe @@ -81,17 +81,27 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
81 private void registerRuleNodeComponents() { 81 private void registerRuleNodeComponents() {
82 Set<BeanDefinition> ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class); 82 Set<BeanDefinition> ruleNodeBeanDefinitions = getBeanDefinitions(RuleNode.class);
83 for (BeanDefinition def : ruleNodeBeanDefinitions) { 83 for (BeanDefinition def : ruleNodeBeanDefinitions) {
84 - try {  
85 - String clazzName = def.getBeanClassName();  
86 - Class<?> clazz = Class.forName(clazzName);  
87 - RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);  
88 - ComponentType type = ruleNodeAnnotation.type();  
89 - ComponentDescriptor component = scanAndPersistComponent(def, type);  
90 - components.put(component.getClazz(), component);  
91 - componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);  
92 - } catch (Exception e) {  
93 - log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);  
94 - throw new RuntimeException(e); 84 + int retryCount = 0;
  85 + Exception cause = null;
  86 + while (retryCount < MAX_OPTIMISITC_RETRIES) {
  87 + try {
  88 + String clazzName = def.getBeanClassName();
  89 + Class<?> clazz = Class.forName(clazzName);
  90 + RuleNode ruleNodeAnnotation = clazz.getAnnotation(RuleNode.class);
  91 + ComponentType type = ruleNodeAnnotation.type();
  92 + ComponentDescriptor component = scanAndPersistComponent(def, type);
  93 + components.put(component.getClazz(), component);
  94 + componentsMap.computeIfAbsent(type, k -> new ArrayList<>()).add(component);
  95 + break;
  96 + } catch (Exception e) {
  97 + log.trace("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
  98 + cause = e;
  99 + retryCount++;
  100 + }
  101 + }
  102 + if (cause != null && retryCount == MAX_OPTIMISITC_RETRIES) {
  103 + log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), cause.getMessage(), cause);
  104 + throw new RuntimeException(cause);
95 } 105 }
96 } 106 }
97 } 107 }
@@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R @@ -133,66 +133,48 @@ public class LocalTransportService extends AbstractTransportService implements R
133 } 133 }
134 134
135 @Override 135 @Override
136 - public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {  
137 - if (checkLimits(sessionInfo, callback)) {  
138 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);  
139 - } 136 + protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  137 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
140 } 138 }
141 139
142 @Override 140 @Override
143 - public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {  
144 - if (checkLimits(sessionInfo, callback)) {  
145 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);  
146 - } 141 + protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  142 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build(), callback);
147 } 143 }
148 144
149 @Override 145 @Override
150 - public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {  
151 - if (checkLimits(sessionInfo, callback)) {  
152 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);  
153 - } 146 + protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  147 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build(), callback);
154 } 148 }
155 149
156 @Override 150 @Override
157 - public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {  
158 - if (checkLimits(sessionInfo, callback)) {  
159 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);  
160 - } 151 + protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  152 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
161 } 153 }
162 154
163 @Override 155 @Override
164 public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) { 156 public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
165 - if (checkLimits(sessionInfo, callback)) {  
166 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);  
167 - } 157 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
168 } 158 }
169 159
170 @Override 160 @Override
171 - public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {  
172 - if (checkLimits(sessionInfo, callback)) {  
173 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);  
174 - } 161 + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  162 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
175 } 163 }
176 164
177 @Override 165 @Override
178 - public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {  
179 - if (checkLimits(sessionInfo, callback)) {  
180 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);  
181 - } 166 + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  167 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
182 } 168 }
183 169
184 @Override 170 @Override
185 - public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {  
186 - if (checkLimits(sessionInfo, callback)) {  
187 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);  
188 - } 171 + protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  172 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
189 } 173 }
190 174
191 @Override 175 @Override
192 - public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {  
193 - if (checkLimits(sessionInfo, callback)) {  
194 - forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);  
195 - } 176 + protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
  177 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build(), callback);
196 } 178 }
197 179
198 @Override 180 @Override
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
26 import org.apache.kafka.clients.producer.ProducerRecord; 26 import org.apache.kafka.clients.producer.ProducerRecord;
27 import org.apache.kafka.clients.producer.RecordMetadata; 27 import org.apache.kafka.clients.producer.RecordMetadata;
28 import org.apache.kafka.common.PartitionInfo; 28 import org.apache.kafka.common.PartitionInfo;
  29 +import org.apache.kafka.common.errors.TopicExistsException;
29 import org.apache.kafka.common.header.Header; 30 import org.apache.kafka.common.header.Header;
30 31
31 import java.util.List; 32 import java.util.List;
@@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate<T> { @@ -75,7 +76,11 @@ public class TBKafkaProducerTemplate<T> {
75 CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1)); 76 CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1));
76 result.all().get(); 77 result.all().get();
77 } catch (Exception e) { 78 } catch (Exception e) {
78 - log.trace("Failed to create topic: {}", e.getMessage(), e); 79 + if ((e instanceof TopicExistsException) || (e.getCause() != null && e.getCause() instanceof TopicExistsException)) {
  80 + log.trace("[{}] Topic already exists: ", defaultTopic);
  81 + } else {
  82 + log.trace("[{}] Failed to create topic: {}", defaultTopic, e.getMessage(), e);
  83 + }
79 } 84 }
80 //Maybe this should not be cached, but we don't plan to change size of partitions 85 //Maybe this should not be cached, but we don't plan to change size of partitions
81 this.partitionInfoMap = new ConcurrentHashMap<>(); 86 this.partitionInfoMap = new ConcurrentHashMap<>();
@@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService { @@ -68,44 +68,68 @@ public abstract class AbstractTransportService implements TransportService {
68 68
69 @Override 69 @Override
70 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) { 70 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
71 - reportActivityInternal(sessionInfo); 71 + if (checkLimits(sessionInfo, callback)) {
  72 + reportActivityInternal(sessionInfo);
  73 + doProcess(sessionInfo, msg, callback);
  74 + }
72 } 75 }
73 76
74 @Override 77 @Override
75 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 78 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
76 - reportActivityInternal(sessionInfo); 79 + if (checkLimits(sessionInfo, callback)) {
  80 + reportActivityInternal(sessionInfo);
  81 + doProcess(sessionInfo, msg, callback);
  82 + }
77 } 83 }
78 84
79 @Override 85 @Override
80 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 86 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
81 - reportActivityInternal(sessionInfo); 87 + if (checkLimits(sessionInfo, callback)) {
  88 + reportActivityInternal(sessionInfo);
  89 + doProcess(sessionInfo, msg, callback);
  90 + }
82 } 91 }
83 92
84 @Override 93 @Override
85 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 94 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
86 - reportActivityInternal(sessionInfo); 95 + if (checkLimits(sessionInfo, callback)) {
  96 + reportActivityInternal(sessionInfo);
  97 + doProcess(sessionInfo, msg, callback);
  98 + }
87 } 99 }
88 100
89 @Override 101 @Override
90 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 102 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
91 - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);  
92 - sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); 103 + if (checkLimits(sessionInfo, callback)) {
  104 + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
  105 + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
  106 + doProcess(sessionInfo, msg, callback);
  107 + }
93 } 108 }
94 109
95 @Override 110 @Override
96 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 111 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
97 - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);  
98 - sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); 112 + if (checkLimits(sessionInfo, callback)) {
  113 + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
  114 + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
  115 + doProcess(sessionInfo, msg, callback);
  116 + }
99 } 117 }
100 118
101 @Override 119 @Override
102 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 120 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
103 - reportActivityInternal(sessionInfo); 121 + if (checkLimits(sessionInfo, callback)) {
  122 + reportActivityInternal(sessionInfo);
  123 + doProcess(sessionInfo, msg, callback);
  124 + }
104 } 125 }
105 126
106 @Override 127 @Override
107 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 128 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
108 - reportActivityInternal(sessionInfo); 129 + if (checkLimits(sessionInfo, callback)) {
  130 + reportActivityInternal(sessionInfo);
  131 + doProcess(sessionInfo, msg, callback);
  132 + }
109 } 133 }
110 134
111 @Override 135 @Override
@@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService { @@ -113,6 +137,22 @@ public abstract class AbstractTransportService implements TransportService {
113 reportActivityInternal(sessionInfo); 137 reportActivityInternal(sessionInfo);
114 } 138 }
115 139
  140 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
  141 +
  142 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
  143 +
  144 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
  145 +
  146 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback);
  147 +
  148 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback);
  149 +
  150 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback);
  151 +
  152 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback);
  153 +
  154 + protected abstract void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
  155 +
116 private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { 156 private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
117 UUID sessionId = toId(sessionInfo); 157 UUID sessionId = toId(sessionInfo);
118 SessionMetaData sessionMetaData = sessions.get(sessionId); 158 SessionMetaData sessionMetaData = sessions.get(sessionId);
@@ -217,102 +217,84 @@ public class RemoteTransportService extends AbstractTransportService { @@ -217,102 +217,84 @@ public class RemoteTransportService extends AbstractTransportService {
217 } 217 }
218 218
219 @Override 219 @Override
220 - public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {  
221 - if (checkLimits(sessionInfo, callback)) {  
222 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
223 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
224 - .setSessionEvent(msg).build()  
225 - ).build();  
226 - send(sessionInfo, toRuleEngineMsg, callback);  
227 - } 220 + public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
  221 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  222 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  223 + .setSubscriptionInfo(msg).build()
  224 + ).build();
  225 + send(sessionInfo, toRuleEngineMsg, callback);
228 } 226 }
229 227
230 @Override 228 @Override
231 - public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {  
232 - if (checkLimits(sessionInfo, callback)) {  
233 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
234 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
235 - .setSubscriptionInfo(msg).build()  
236 - ).build();  
237 - send(sessionInfo, toRuleEngineMsg, callback);  
238 - } 229 + protected void doProcess(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  230 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  231 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  232 + .setSessionEvent(msg).build()
  233 + ).build();
  234 + send(sessionInfo, toRuleEngineMsg, callback);
239 } 235 }
240 236
241 @Override 237 @Override
242 - public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {  
243 - if (checkLimits(sessionInfo, callback)) {  
244 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
245 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
246 - .setPostTelemetry(msg).build()  
247 - ).build();  
248 - send(sessionInfo, toRuleEngineMsg, callback);  
249 - } 238 + protected void doProcess(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  239 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  240 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  241 + .setPostTelemetry(msg).build()
  242 + ).build();
  243 + send(sessionInfo, toRuleEngineMsg, callback);
250 } 244 }
251 245
252 @Override 246 @Override
253 - public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {  
254 - if (checkLimits(sessionInfo, callback)) {  
255 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
256 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
257 - .setPostAttributes(msg).build()  
258 - ).build();  
259 - send(sessionInfo, toRuleEngineMsg, callback);  
260 - } 247 + protected void doProcess(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  248 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  249 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  250 + .setPostAttributes(msg).build()
  251 + ).build();
  252 + send(sessionInfo, toRuleEngineMsg, callback);
261 } 253 }
262 254
263 @Override 255 @Override
264 - public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {  
265 - if (checkLimits(sessionInfo, callback)) {  
266 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
267 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
268 - .setGetAttributes(msg).build()  
269 - ).build();  
270 - send(sessionInfo, toRuleEngineMsg, callback);  
271 - } 256 + protected void doProcess(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  257 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  258 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  259 + .setGetAttributes(msg).build()
  260 + ).build();
  261 + send(sessionInfo, toRuleEngineMsg, callback);
272 } 262 }
273 263
274 @Override 264 @Override
275 - public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {  
276 - if (checkLimits(sessionInfo, callback)) {  
277 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
278 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
279 - .setSubscribeToAttributes(msg).build()  
280 - ).build();  
281 - send(sessionInfo, toRuleEngineMsg, callback);  
282 - } 265 + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  266 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  267 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  268 + .setSubscribeToAttributes(msg).build()
  269 + ).build();
  270 + send(sessionInfo, toRuleEngineMsg, callback);
283 } 271 }
284 272
285 @Override 273 @Override
286 - public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {  
287 - if (checkLimits(sessionInfo, callback)) {  
288 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
289 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
290 - .setSubscribeToRPC(msg).build()  
291 - ).build();  
292 - send(sessionInfo, toRuleEngineMsg, callback);  
293 - } 274 + protected void doProcess(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  275 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  276 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  277 + .setSubscribeToRPC(msg).build()
  278 + ).build();
  279 + send(sessionInfo, toRuleEngineMsg, callback);
294 } 280 }
295 281
296 @Override 282 @Override
297 - public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {  
298 - if (checkLimits(sessionInfo, callback)) {  
299 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
300 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
301 - .setToDeviceRPCCallResponse(msg).build()  
302 - ).build();  
303 - send(sessionInfo, toRuleEngineMsg, callback);  
304 - } 283 + protected void doProcess(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  284 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  285 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  286 + .setToDeviceRPCCallResponse(msg).build()
  287 + ).build();
  288 + send(sessionInfo, toRuleEngineMsg, callback);
305 } 289 }
306 290
307 @Override 291 @Override
308 - public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {  
309 - if (checkLimits(sessionInfo, callback)) {  
310 - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(  
311 - TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)  
312 - .setToServerRPCCallRequest(msg).build()  
313 - ).build();  
314 - send(sessionInfo, toRuleEngineMsg, callback);  
315 - } 292 + protected void doProcess(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
  293 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  294 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  295 + .setToServerRPCCallRequest(msg).build()
  296 + ).build();
  297 + send(sessionInfo, toRuleEngineMsg, callback);
316 } 298 }
317 299
318 private static class TransportCallbackAdaptor implements Callback { 300 private static class TransportCallbackAdaptor implements Callback {
@@ -40,7 +40,7 @@ @@ -40,7 +40,7 @@
40 </encoder> 40 </encoder>
41 </appender> 41 </appender>
42 42
43 - <logger name="org.thingsboard.server" level="INFO" /> 43 + <logger name="org.thingsboard.server" level="TRACE" />
44 <logger name="akka" level="INFO" /> 44 <logger name="akka" level="INFO" />
45 45
46 <root level="INFO"> 46 <root level="INFO">