Commit 8ab4f144b2713b2692f7b260918825f9258b107d

Authored by Andrew Shvayka
1 parent aa4787e9

Implementation of Session timeouts

Showing 24 changed files with 460 additions and 82 deletions
@@ -32,3 +32,4 @@ pom.xml.versionsBackup @@ -32,3 +32,4 @@ pom.xml.versionsBackup
32 **/Californium.properties 32 **/Californium.properties
33 **/.env 33 **/.env
34 .instance_id 34 .instance_id
  35 +rebuild-docker.sh
@@ -67,6 +67,7 @@ import org.thingsboard.server.service.mail.MailExecutorService; @@ -67,6 +67,7 @@ import org.thingsboard.server.service.mail.MailExecutorService;
67 import org.thingsboard.server.service.rpc.DeviceRpcService; 67 import org.thingsboard.server.service.rpc.DeviceRpcService;
68 import org.thingsboard.server.service.script.JsExecutorService; 68 import org.thingsboard.server.service.script.JsExecutorService;
69 import org.thingsboard.server.service.script.JsInvokeService; 69 import org.thingsboard.server.service.script.JsInvokeService;
  70 +import org.thingsboard.server.service.session.DeviceSessionCacheService;
70 import org.thingsboard.server.service.state.DeviceStateService; 71 import org.thingsboard.server.service.state.DeviceStateService;
71 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; 72 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
72 import org.thingsboard.server.service.transport.RuleEngineTransportService; 73 import org.thingsboard.server.service.transport.RuleEngineTransportService;
@@ -201,6 +202,10 @@ public class ActorSystemContext { @@ -201,6 +202,10 @@ public class ActorSystemContext {
201 @Getter 202 @Getter
202 private DeviceStateService deviceStateService; 203 private DeviceStateService deviceStateService;
203 204
  205 + @Autowired
  206 + @Getter
  207 + private DeviceSessionCacheService deviceSessionCacheService;
  208 +
204 @Lazy 209 @Lazy
205 @Autowired 210 @Autowired
206 @Getter 211 @Getter
@@ -254,6 +259,14 @@ public class ActorSystemContext { @@ -254,6 +259,14 @@ public class ActorSystemContext {
254 @Getter 259 @Getter
255 private boolean allowSystemMailService; 260 private boolean allowSystemMailService;
256 261
  262 + @Value("${transport.sessions.inactivity_timeout}")
  263 + @Getter
  264 + private long sessionInactivityTimeout;
  265 +
  266 + @Value("${transport.sessions.report_timeout}")
  267 + @Getter
  268 + private long sessionReportTimeout;
  269 +
257 @Getter 270 @Getter
258 @Setter 271 @Setter
259 private ActorSystem actorSystem; 272 private ActorSystem actorSystem;
@@ -44,11 +44,19 @@ public class DeviceActor extends ContextAwareActor { @@ -44,11 +44,19 @@ public class DeviceActor extends ContextAwareActor {
44 } 44 }
45 45
46 @Override 46 @Override
  47 + public void preStart() {
  48 + logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
  49 + try {
  50 + processor.initSessionTimeout(context());
  51 + logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
  52 + } catch (Exception e) {
  53 + logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId);
  54 + }
  55 + }
  56 +
  57 + @Override
47 protected boolean process(TbActorMsg msg) { 58 protected boolean process(TbActorMsg msg) {
48 switch (msg.getMsgType()) { 59 switch (msg.getMsgType()) {
49 - case CLUSTER_EVENT_MSG:  
50 - processor.processClusterEventMsg((ClusterEventMsg) msg);  
51 - break;  
52 case TRANSPORT_TO_DEVICE_ACTOR_MSG: 60 case TRANSPORT_TO_DEVICE_ACTOR_MSG:
53 processor.process(context(), (TransportToDeviceActorMsgWrapper) msg); 61 processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
54 break; 62 break;
@@ -73,6 +81,9 @@ public class DeviceActor extends ContextAwareActor { @@ -73,6 +81,9 @@ public class DeviceActor extends ContextAwareActor {
73 case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG: 81 case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG:
74 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg); 82 processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg);
75 break; 83 break;
  84 + case SESSION_TIMEOUT_MSG:
  85 + processor.checkSessionsTimeout();
  86 + break;
76 default: 87 default:
77 return false; 88 return false;
78 } 89 }
@@ -88,11 +88,11 @@ import java.util.stream.Collectors; @@ -88,11 +88,11 @@ import java.util.stream.Collectors;
88 /** 88 /**
89 * @author Andrew Shvayka 89 * @author Andrew Shvayka
90 */ 90 */
91 -public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { 91 +class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
92 92
93 - private final TenantId tenantId;  
94 - private final DeviceId deviceId;  
95 - private final Map<UUID, SessionInfo> sessions; 93 + final TenantId tenantId;
  94 + final DeviceId deviceId;
  95 + private final Map<UUID, SessionInfoMetaData> sessions;
96 private final Map<UUID, SessionInfo> attributeSubscriptions; 96 private final Map<UUID, SessionInfo> attributeSubscriptions;
97 private final Map<UUID, SessionInfo> rpcSubscriptions; 97 private final Map<UUID, SessionInfo> rpcSubscriptions;
98 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap; 98 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
@@ -116,6 +116,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -116,6 +116,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
116 this.toDeviceRpcPendingMap = new HashMap<>(); 116 this.toDeviceRpcPendingMap = new HashMap<>();
117 this.toServerRpcPendingMap = new HashMap<>(); 117 this.toServerRpcPendingMap = new HashMap<>();
118 initAttributes(); 118 initAttributes();
  119 + restoreSessions();
119 } 120 }
120 121
121 private void initAttributes() { 122 private void initAttributes() {
@@ -160,7 +161,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -160,7 +161,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
160 } else { 161 } else {
161 logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId()); 162 logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
162 } 163 }
163 -  
164 } 164 }
165 165
166 private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { 166 private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
@@ -174,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -174,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
174 if (requestMd != null) { 174 if (requestMd != null) {
175 logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); 175 logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
176 systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 176 systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
177 - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 177 + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
178 } 178 }
179 } 179 }
180 180
@@ -227,11 +227,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -227,11 +227,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
227 } 227 }
228 if (msg.hasPostAttributes()) { 228 if (msg.hasPostAttributes()) {
229 handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); 229 handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
230 - reportActivity(); 230 + reportLogicalDeviceActivity();
231 } 231 }
232 if (msg.hasPostTelemetry()) { 232 if (msg.hasPostTelemetry()) {
233 handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry()); 233 handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
234 - reportActivity(); 234 + reportLogicalDeviceActivity();
235 } 235 }
236 if (msg.hasGetAttributes()) { 236 if (msg.hasGetAttributes()) {
237 handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); 237 handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
@@ -241,11 +241,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -241,11 +241,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
241 } 241 }
242 if (msg.hasToServerRPCCallRequest()) { 242 if (msg.hasToServerRPCCallRequest()) {
243 handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest()); 243 handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest());
244 - reportActivity(); 244 + reportLogicalDeviceActivity();
  245 + }
  246 + if (msg.hasSubscriptionInfo()) {
  247 + handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo());
245 } 248 }
246 } 249 }
247 250
248 - private void reportActivity() { 251 + private void reportLogicalDeviceActivity() {
249 systemContext.getDeviceStateService().onDeviceActivity(deviceId); 252 systemContext.getDeviceStateService().onDeviceActivity(deviceId);
250 } 253 }
251 254
@@ -406,28 +409,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -406,28 +409,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
406 } 409 }
407 } 410 }
408 411
409 - void processClusterEventMsg(ClusterEventMsg msg) {  
410 -// if (!msg.isAdded()) {  
411 -// logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());  
412 -// Predicate<Map.Entry<SessionId, SessionInfo>> filter = e -> e.getValue().getServer()  
413 -// .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false);  
414 -// attributeSubscriptions.entrySet().removeIf(filter);  
415 -// rpcSubscriptions.entrySet().removeIf(filter);  
416 -// }  
417 - }  
418 -  
419 private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { 412 private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
420 UUID sessionId = getSessionId(sessionInfo); 413 UUID sessionId = getSessionId(sessionInfo);
421 if (subscribeCmd.getUnsubscribe()) { 414 if (subscribeCmd.getUnsubscribe()) {
422 logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); 415 logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
423 attributeSubscriptions.remove(sessionId); 416 attributeSubscriptions.remove(sessionId);
424 } else { 417 } else {
425 - SessionInfo session = sessions.get(sessionId);  
426 - if (session == null) {  
427 - session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); 418 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  419 + if (sessionMD == null) {
  420 + sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
428 } 421 }
  422 + sessionMD.setSubscribedToAttributes(true);
429 logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); 423 logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
430 - attributeSubscriptions.put(sessionId, session); 424 + attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
  425 + dumpSessions();
431 } 426 }
432 } 427 }
433 428
@@ -441,20 +436,22 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -441,20 +436,22 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
441 logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); 436 logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
442 rpcSubscriptions.remove(sessionId); 437 rpcSubscriptions.remove(sessionId);
443 } else { 438 } else {
444 - SessionInfo session = sessions.get(sessionId);  
445 - if (session == null) {  
446 - session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); 439 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  440 + if (sessionMD == null) {
  441 + sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()));
447 } 442 }
  443 + sessionMD.setSubscribedToRPC(true);
448 logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); 444 logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
449 - rpcSubscriptions.put(sessionId, session); 445 + rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
450 sendPendingRequests(context, sessionId, sessionInfo); 446 sendPendingRequests(context, sessionId, sessionInfo);
  447 + dumpSessions();
451 } 448 }
452 } 449 }
453 450
454 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { 451 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
455 UUID sessionId = getSessionId(sessionInfo); 452 UUID sessionId = getSessionId(sessionInfo);
456 if (msg.getEvent() == SessionEvent.OPEN) { 453 if (msg.getEvent() == SessionEvent.OPEN) {
457 - if(sessions.containsKey(sessionId)){ 454 + if (sessions.containsKey(sessionId)) {
458 logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId); 455 logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
459 return; 456 return;
460 } 457 }
@@ -462,13 +459,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -462,13 +459,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
462 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { 459 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
463 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); 460 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
464 if (sessionIdToRemove != null) { 461 if (sessionIdToRemove != null) {
465 - closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); 462 + notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
466 } 463 }
467 } 464 }
468 - sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())); 465 + sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())));
469 if (sessions.size() == 1) { 466 if (sessions.size() == 1) {
470 reportSessionOpen(); 467 reportSessionOpen();
471 } 468 }
  469 + dumpSessions();
472 } else if (msg.getEvent() == SessionEvent.CLOSED) { 470 } else if (msg.getEvent() == SessionEvent.CLOSED) {
473 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); 471 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
474 sessions.remove(sessionId); 472 sessions.remove(sessionId);
@@ -477,21 +475,34 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -477,21 +475,34 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
477 if (sessions.isEmpty()) { 475 if (sessions.isEmpty()) {
478 reportSessionClose(); 476 reportSessionClose();
479 } 477 }
  478 + dumpSessions();
  479 + }
  480 + }
  481 +
  482 + private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) {
  483 + UUID sessionId = getSessionId(sessionInfo);
  484 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  485 + if (sessionMD != null) {
  486 + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
  487 + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
  488 + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
480 } 489 }
  490 + dumpSessions();
481 } 491 }
482 492
483 void processCredentialsUpdate() { 493 void processCredentialsUpdate() {
484 - sessions.forEach(this::closeSession); 494 + sessions.forEach(this::notifyTransportAboutClosedSession);
485 attributeSubscriptions.clear(); 495 attributeSubscriptions.clear();
486 rpcSubscriptions.clear(); 496 rpcSubscriptions.clear();
  497 + dumpSessions();
487 } 498 }
488 499
489 - private void closeSession(UUID sessionId, SessionInfo sessionInfo) { 500 + private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) {
490 DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() 501 DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
491 .setSessionIdMSB(sessionId.getMostSignificantBits()) 502 .setSessionIdMSB(sessionId.getMostSignificantBits())
492 .setSessionIdLSB(sessionId.getLeastSignificantBits()) 503 .setSessionIdLSB(sessionId.getLeastSignificantBits())
493 .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build(); 504 .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
494 - systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); 505 + systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
495 } 506 }
496 507
497 void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) { 508 void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
@@ -605,4 +616,67 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -605,4 +616,67 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
605 } 616 }
606 return builder.build(); 617 return builder.build();
607 } 618 }
  619 +
  620 + private void restoreSessions() {
  621 + TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
  622 + if (sessionsDump.getSerializedSize() == 0) {
  623 + return;
  624 + }
  625 + for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
  626 + SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
  627 + UUID sessionId = getSessionId(sessionInfoProto);
  628 + SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId());
  629 + TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
  630 + SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
  631 + sessions.put(sessionId, sessionInfoMetaData);
  632 + if (subInfo.getAttributeSubscription()) {
  633 + rpcSubscriptions.put(sessionId, sessionInfo);
  634 + }
  635 + if (subInfo.getAttributeSubscription()) {
  636 + attributeSubscriptions.put(sessionId, sessionInfo);
  637 + }
  638 + }
  639 + }
  640 +
  641 + private void dumpSessions() {
  642 + List<TransportProtos.SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
  643 + sessions.forEach((uuid, sessionMD) -> {
  644 + if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) {
  645 + return;
  646 + }
  647 + SessionInfo sessionInfo = sessionMD.getSessionInfo();
  648 + TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
  649 + .setLastActivityTime(sessionMD.getLastActivityTime())
  650 + .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
  651 + .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
  652 + TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder()
  653 + .setSessionIdMSB(uuid.getMostSignificantBits())
  654 + .setSessionIdLSB(uuid.getLeastSignificantBits())
  655 + .setNodeId(sessionInfo.getNodeId()).build();
  656 + sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder()
  657 + .setSessionInfo(sessionInfoProto)
  658 + .setSubscriptionInfo(subscriptionInfoProto).build());
  659 + });
  660 + systemContext.getDeviceSessionCacheService()
  661 + .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder()
  662 + .addAllSessions(sessionsList).build());
  663 + }
  664 +
  665 + void initSessionTimeout(ActorContext context) {
  666 + schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout());
  667 + }
  668 +
  669 + void checkSessionsTimeout() {
  670 + long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
  671 + Map<UUID, SessionInfoMetaData> sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  672 + sessionsToRemove.forEach((sessionId, sessionMD) -> {
  673 + sessions.remove(sessionId);
  674 + rpcSubscriptions.remove(sessionId);
  675 + attributeSubscriptions.remove(sessionId);
  676 + notifyTransportAboutClosedSession(sessionId, sessionMD);
  677 + });
  678 + if (!sessionsToRemove.isEmpty()) {
  679 + dumpSessions();
  680 + }
  681 + }
608 } 682 }
@@ -25,4 +25,5 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; @@ -25,4 +25,5 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
25 public class SessionInfo { 25 public class SessionInfo {
26 private final SessionType type; 26 private final SessionType type;
27 private final String nodeId; 27 private final String nodeId;
  28 + private long lastActivityTime;
28 } 29 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.device;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
  20 +
  21 +/**
  22 + * @author Andrew Shvayka
  23 + */
  24 +@Data
  25 +class SessionInfoMetaData {
  26 + private final SessionInfo sessionInfo;
  27 + private long lastActivityTime;
  28 + private boolean subscribedToAttributes;
  29 + private boolean subscribedToRPC;
  30 +
  31 + SessionInfoMetaData(SessionInfo sessionInfo) {
  32 + this(sessionInfo, System.currentTimeMillis());
  33 + }
  34 +
  35 + SessionInfoMetaData(SessionInfo sessionInfo, long lastActivityTime) {
  36 + this.sessionInfo = sessionInfo;
  37 + this.lastActivityTime = lastActivityTime;
  38 + }
  39 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.device;
  17 +
  18 +import org.thingsboard.server.common.msg.MsgType;
  19 +import org.thingsboard.server.common.msg.TbActorMsg;
  20 +
  21 +/**
  22 + * Created by ashvayka on 29.10.18.
  23 + */
  24 +public class SessionTimeoutCheckMsg implements TbActorMsg {
  25 +
  26 + private static final SessionTimeoutCheckMsg INSTANCE = new SessionTimeoutCheckMsg();
  27 +
  28 + private SessionTimeoutCheckMsg() {
  29 + }
  30 +
  31 + public static SessionTimeoutCheckMsg instance() {
  32 + return INSTANCE;
  33 + }
  34 +
  35 + @Override
  36 + public MsgType getMsgType() {
  37 + return MsgType.SESSION_TIMEOUT_MSG;
  38 + }
  39 +}
@@ -40,43 +40,31 @@ public abstract class AbstractContextAwareMsgProcessor { @@ -40,43 +40,31 @@ public abstract class AbstractContextAwareMsgProcessor {
40 this.logger = logger; 40 this.logger = logger;
41 } 41 }
42 42
43 - protected ActorRef getAppActor() {  
44 - return systemContext.getAppActor();  
45 - }  
46 -  
47 - protected Scheduler getScheduler() { 43 + private Scheduler getScheduler() {
48 return systemContext.getScheduler(); 44 return systemContext.getScheduler();
49 } 45 }
50 46
51 - protected ExecutionContextExecutor getSystemDispatcher() { 47 + private ExecutionContextExecutor getSystemDispatcher() {
52 return systemContext.getActorSystem().dispatcher(); 48 return systemContext.getActorSystem().dispatcher();
53 } 49 }
54 50
55 protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) { 51 protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) {
56 - schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs, ctx.self()); 52 + schedulePeriodicMsgWithDelay(msg, delayInMs, periodInMs, ctx.self());
57 } 53 }
58 54
59 - protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs, ActorRef target) { 55 + private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) {
60 logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); 56 logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs);
61 getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); 57 getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
62 } 58 }
63 59
64 -  
65 protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) { 60 protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) {
66 - scheduleMsgWithDelay(ctx, msg, delayInMs, ctx.self()); 61 + scheduleMsgWithDelay(msg, delayInMs, ctx.self());
67 } 62 }
68 63
69 - protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, ActorRef target) { 64 + private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) {
70 logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); 65 logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
71 getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); 66 getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null);
72 } 67 }
73 68
74 - @Data  
75 - @AllArgsConstructor  
76 - private static class ComponentConfiguration {  
77 - private final String clazz;  
78 - private final String name;  
79 - private final String configuration;  
80 - }  
81 69
82 } 70 }
@@ -127,7 +127,6 @@ public class TenantActor extends RuleChainManagerActor { @@ -127,7 +127,6 @@ public class TenantActor extends RuleChainManagerActor {
127 ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self()); 127 ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self());
128 } 128 }
129 129
130 -  
131 private void onToDeviceActorMsg(DeviceAwareMsg msg) { 130 private void onToDeviceActorMsg(DeviceAwareMsg msg) {
132 getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender()); 131 getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender());
133 } 132 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.session;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.cache.annotation.CachePut;
  20 +import org.springframework.cache.annotation.Cacheable;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
  24 +
  25 +import java.util.ArrayList;
  26 +import java.util.Collections;
  27 +
  28 +import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE;
  29 +
  30 +/**
  31 + * Created by ashvayka on 29.10.18.
  32 + */
  33 +@Service
  34 +@Slf4j
  35 +public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService {
  36 +
  37 + @Override
  38 + @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId")
  39 + public DeviceSessionsCacheEntry get(DeviceId deviceId) {
  40 + log.debug("[{}] Fetching session data from cache", deviceId);
  41 + return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build();
  42 + }
  43 +
  44 + @Override
  45 + @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId")
  46 + public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) {
  47 + log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions);
  48 + return sessions;
  49 + }
  50 +}
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.session;
  17 +
  18 +import org.thingsboard.server.common.data.id.DeviceId;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
  20 +
  21 +/**
  22 + * Created by ashvayka on 29.10.18.
  23 + */
  24 +public interface DeviceSessionCacheService {
  25 +
  26 + DeviceSessionsCacheEntry get(DeviceId deviceId);
  27 +
  28 + DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions);
  29 +
  30 +}
@@ -161,6 +161,13 @@ public class LocalTransportService extends AbstractTransportService implements R @@ -161,6 +161,13 @@ public class LocalTransportService extends AbstractTransportService implements R
161 } 161 }
162 162
163 @Override 163 @Override
  164 + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
  165 + if (checkLimits(sessionInfo, callback)) {
  166 + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
  167 + }
  168 + }
  169 +
  170 + @Override
164 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 171 public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
165 if (checkLimits(sessionInfo, callback)) { 172 if (checkLimits(sessionInfo, callback)) {
166 forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback); 173 forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
@@ -202,6 +202,9 @@ caffeine: @@ -202,6 +202,9 @@ caffeine:
202 devices: 202 devices:
203 timeToLiveInMinutes: 1440 203 timeToLiveInMinutes: 1440
204 maxSize: 100000 204 maxSize: 100000
  205 + sessions:
  206 + timeToLiveInMinutes: 1440
  207 + maxSize: 100000
205 assets: 208 assets:
206 timeToLiveInMinutes: 1440 209 timeToLiveInMinutes: 1440
207 maxSize: 100000 210 maxSize: 100000
@@ -392,6 +395,9 @@ transport: @@ -392,6 +395,9 @@ transport:
392 auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}" 395 auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
393 notifications: 396 notifications:
394 topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" 397 topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  398 + sessions:
  399 + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
  400 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
395 rate_limits: 401 rate_limits:
396 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" 402 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
397 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" 403 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
@@ -19,6 +19,7 @@ public class CacheConstants { @@ -19,6 +19,7 @@ public class CacheConstants {
19 public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials"; 19 public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials";
20 public static final String RELATIONS_CACHE = "relations"; 20 public static final String RELATIONS_CACHE = "relations";
21 public static final String DEVICE_CACHE = "devices"; 21 public static final String DEVICE_CACHE = "devices";
  22 + public static final String SESSIONS_CACHE = "sessions";
22 public static final String ASSET_CACHE = "assets"; 23 public static final String ASSET_CACHE = "assets";
23 public static final String ENTITY_VIEW_CACHE = "entityViews"; 24 public static final String ENTITY_VIEW_CACHE = "entityViews";
24 } 25 }
@@ -96,13 +96,8 @@ public enum MsgType { @@ -96,13 +96,8 @@ public enum MsgType {
96 */ 96 */
97 DEVICE_ACTOR_TO_RULE_ENGINE_MSG, 97 DEVICE_ACTOR_TO_RULE_ENGINE_MSG,
98 98
99 - /**  
100 - * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue.  
101 - */  
102 - ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG,  
103 - TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,  
104 SESSION_TIMEOUT_MSG, 99 SESSION_TIMEOUT_MSG,
105 - SESSION_CTRL_MSG, 100 +
106 STATS_PERSIST_TICK_MSG, 101 STATS_PERSIST_TICK_MSG,
107 102
108 103
@@ -43,10 +43,10 @@ import org.thingsboard.server.common.transport.TransportService; @@ -43,10 +43,10 @@ import org.thingsboard.server.common.transport.TransportService;
43 import org.thingsboard.server.common.transport.TransportServiceCallback; 43 import org.thingsboard.server.common.transport.TransportServiceCallback;
44 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 44 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
45 import org.thingsboard.server.common.msg.EncryptionUtil; 45 import org.thingsboard.server.common.msg.EncryptionUtil;
  46 +import org.thingsboard.server.common.transport.service.AbstractTransportService;
46 import org.thingsboard.server.gen.transport.TransportProtos; 47 import org.thingsboard.server.gen.transport.TransportProtos;
47 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 48 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
48 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; 49 import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
49 -import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;  
50 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; 50 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
51 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; 51 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
52 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; 52 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
@@ -141,9 +141,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -141,9 +141,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
141 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); 141 processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
142 break; 142 break;
143 case PINGREQ: 143 case PINGREQ:
144 - //TODO: should we push the notification to the rule engine?  
145 if (checkConnected(ctx)) { 144 if (checkConnected(ctx)) {
146 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); 145 ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
  146 + transportService.reportActivity(sessionInfo);
  147 + if (gatewaySessionHandler != null) {
  148 + gatewaySessionHandler.reportActivity();
  149 + }
147 } 150 }
148 break; 151 break;
149 case DISCONNECT: 152 case DISCONNECT:
@@ -394,7 +397,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -394,7 +397,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
394 private void processDisconnect(ChannelHandlerContext ctx) { 397 private void processDisconnect(ChannelHandlerContext ctx) {
395 ctx.close(); 398 ctx.close();
396 if (deviceSessionCtx.isConnected()) { 399 if (deviceSessionCtx.isConnected()) {
397 - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); 400 + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
398 transportService.deregisterSession(sessionInfo); 401 transportService.deregisterSession(sessionInfo);
399 if (gatewaySessionHandler != null) { 402 if (gatewaySessionHandler != null) {
400 gatewaySessionHandler.onGatewayDisconnect(); 403 gatewaySessionHandler.onGatewayDisconnect();
@@ -466,16 +469,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -466,16 +469,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
466 } 469 }
467 } 470 }
468 471
469 - public static SessionEventMsg getSessionEventMsg(SessionEvent event) {  
470 - return SessionEventMsg.newBuilder()  
471 - .setSessionType(TransportProtos.SessionType.ASYNC)  
472 - .setEvent(event).build();  
473 - }  
474 -  
475 @Override 472 @Override
476 public void operationComplete(Future<? super Void> future) throws Exception { 473 public void operationComplete(Future<? super Void> future) throws Exception {
477 if (deviceSessionCtx.isConnected()) { 474 if (deviceSessionCtx.isConnected()) {
478 - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); 475 + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
479 transportService.deregisterSession(sessionInfo); 476 transportService.deregisterSession(sessionInfo);
480 } 477 }
481 } 478 }
@@ -495,7 +492,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -495,7 +492,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
495 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) 492 .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
496 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) 493 .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
497 .build(); 494 .build();
498 - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null); 495 + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.OPEN), null);
499 transportService.registerAsyncSession(sessionInfo, this); 496 transportService.registerAsyncSession(sessionInfo, this);
500 checkGatewaySession(); 497 checkGatewaySession();
501 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); 498 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
@@ -34,6 +34,7 @@ import org.thingsboard.server.common.transport.TransportService; @@ -34,6 +34,7 @@ import org.thingsboard.server.common.transport.TransportService;
34 import org.thingsboard.server.common.transport.TransportServiceCallback; 34 import org.thingsboard.server.common.transport.TransportServiceCallback;
35 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 35 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
36 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 36 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  37 +import org.thingsboard.server.common.transport.service.AbstractTransportService;
37 import org.thingsboard.server.gen.transport.TransportProtos; 38 import org.thingsboard.server.gen.transport.TransportProtos;
38 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 39 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
39 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; 40 import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@@ -118,7 +119,7 @@ public class GatewaySessionHandler { @@ -118,7 +119,7 @@ public class GatewaySessionHandler {
118 GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); 119 GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
119 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { 120 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
120 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); 121 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
121 - transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 122 + transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
122 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); 123 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
123 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); 124 transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
124 transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); 125 transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
@@ -334,7 +335,7 @@ public class GatewaySessionHandler { @@ -334,7 +335,7 @@ public class GatewaySessionHandler {
334 335
335 private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) { 336 private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
336 transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); 337 transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
337 - transportService.process(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); 338 + transportService.process(deviceSessionCtx.getSessionInfo(), AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
338 log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); 339 log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
339 } 340 }
340 341
@@ -360,11 +361,15 @@ public class GatewaySessionHandler { @@ -360,11 +361,15 @@ public class GatewaySessionHandler {
360 return context; 361 return context;
361 } 362 }
362 363
363 - public MqttTransportAdaptor getAdaptor() { 364 + MqttTransportAdaptor getAdaptor() {
364 return context.getAdaptor(); 365 return context.getAdaptor();
365 } 366 }
366 367
367 - public int nextMsgId() { 368 + int nextMsgId() {
368 return deviceSessionCtx.nextMsgId(); 369 return deviceSessionCtx.nextMsgId();
369 } 370 }
  371 +
  372 + public void reportActivity() {
  373 + devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo()));
  374 + }
370 } 375 }
@@ -61,10 +61,14 @@ public interface TransportService { @@ -61,10 +61,14 @@ public interface TransportService {
61 61
62 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback); 62 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
63 63
  64 + void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
  65 +
64 void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); 66 void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
65 67
66 void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); 68 void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
67 69
  70 + void reportActivity(SessionInfoProto sessionInfo);
  71 +
68 void deregisterSession(SessionInfoProto sessionInfo); 72 void deregisterSession(SessionInfoProto sessionInfo);
69 73
70 } 74 }
@@ -47,9 +47,14 @@ public abstract class AbstractTransportService implements TransportService { @@ -47,9 +47,14 @@ public abstract class AbstractTransportService implements TransportService {
47 private String perTenantLimitsConf; 47 private String perTenantLimitsConf;
48 @Value("${transport.rate_limits.tenant}") 48 @Value("${transport.rate_limits.tenant}")
49 private String perDevicesLimitsConf; 49 private String perDevicesLimitsConf;
  50 + @Value("${transport.sessions.inactivity_timeout}")
  51 + private long sessionInactivityTimeout;
  52 + @Value("${transport.sessions.report_timeout}")
  53 + private long sessionReportTimeout;
50 54
51 protected ScheduledExecutorService schedulerExecutor; 55 protected ScheduledExecutorService schedulerExecutor;
52 protected ExecutorService transportCallbackExecutor; 56 protected ExecutorService transportCallbackExecutor;
  57 +
53 private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>(); 58 private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
54 59
55 //TODO: Implement cleanup of this maps. 60 //TODO: Implement cleanup of this maps.
@@ -59,7 +64,81 @@ public abstract class AbstractTransportService implements TransportService { @@ -59,7 +64,81 @@ public abstract class AbstractTransportService implements TransportService {
59 @Override 64 @Override
60 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { 65 public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
61 sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); 66 sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
62 - //TODO: monitor sessions periodically: PING REQ/RESP, etc. 67 + }
  68 +
  69 + @Override
  70 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  71 + reportActivityInternal(sessionInfo);
  72 + }
  73 +
  74 + @Override
  75 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  76 + reportActivityInternal(sessionInfo);
  77 + }
  78 +
  79 + @Override
  80 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  81 + reportActivityInternal(sessionInfo);
  82 + }
  83 +
  84 + @Override
  85 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
  86 + reportActivityInternal(sessionInfo);
  87 + }
  88 +
  89 + @Override
  90 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
  91 + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
  92 + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
  93 + }
  94 +
  95 + @Override
  96 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  97 + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
  98 + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
  99 + }
  100 +
  101 + @Override
  102 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  103 + reportActivityInternal(sessionInfo);
  104 + }
  105 +
  106 + @Override
  107 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
  108 + reportActivityInternal(sessionInfo);
  109 + }
  110 +
  111 + @Override
  112 + public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
  113 + reportActivityInternal(sessionInfo);
  114 + }
  115 +
  116 + private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
  117 + UUID sessionId = toId(sessionInfo);
  118 + SessionMetaData sessionMetaData = sessions.get(sessionId);
  119 + if (sessionMetaData != null) {
  120 + sessionMetaData.updateLastActivityTime();
  121 + }
  122 + return sessionMetaData;
  123 + }
  124 +
  125 + private void checkInactivityAndReportActivity() {
  126 + long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
  127 + sessions.forEach((uuid, sessionMD) -> {
  128 + if (sessionMD.getLastActivityTime() < expTime) {
  129 + if (log.isDebugEnabled()) {
  130 + log.debug("[{}] Session has expired due to last activity time: {}", toId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
  131 + }
  132 + process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  133 + sessions.remove(uuid);
  134 + sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
  135 + } else {
  136 + process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
  137 + .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
  138 + .setRpcSubscription(sessionMD.isSubscribedToRPC())
  139 + .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null);
  140 + }
  141 + });
63 } 142 }
64 143
65 @Override 144 @Override
@@ -131,7 +210,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -131,7 +210,7 @@ public abstract class AbstractTransportService implements TransportService {
131 } 210 }
132 } 211 }
133 212
134 - protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) { 213 + private UUID toId(TransportProtos.SessionInfoProto sessionInfo) {
135 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); 214 return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
136 } 215 }
137 216
@@ -147,6 +226,7 @@ public abstract class AbstractTransportService implements TransportService { @@ -147,6 +226,7 @@ public abstract class AbstractTransportService implements TransportService {
147 } 226 }
148 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); 227 this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
149 this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); 228 this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
  229 + this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
150 } 230 }
151 231
152 public void destroy() { 232 public void destroy() {
@@ -161,4 +241,10 @@ public abstract class AbstractTransportService implements TransportService { @@ -161,4 +241,10 @@ public abstract class AbstractTransportService implements TransportService {
161 transportCallbackExecutor.shutdownNow(); 241 transportCallbackExecutor.shutdownNow();
162 } 242 }
163 } 243 }
  244 +
  245 + public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
  246 + return TransportProtos.SessionEventMsg.newBuilder()
  247 + .setSessionType(TransportProtos.SessionType.ASYNC)
  248 + .setEvent(event).build();
  249 + }
164 } 250 }
@@ -228,6 +228,17 @@ public class RemoteTransportService extends AbstractTransportService { @@ -228,6 +228,17 @@ public class RemoteTransportService extends AbstractTransportService {
228 } 228 }
229 229
230 @Override 230 @Override
  231 + public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
  232 + if (checkLimits(sessionInfo, callback)) {
  233 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  234 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  235 + .setSubscriptionInfo(msg).build()
  236 + ).build();
  237 + send(sessionInfo, toRuleEngineMsg, callback);
  238 + }
  239 + }
  240 +
  241 + @Override
231 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 242 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
232 if (checkLimits(sessionInfo, callback)) { 243 if (checkLimits(sessionInfo, callback)) {
233 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 244 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
@@ -23,10 +23,25 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -23,10 +23,25 @@ import org.thingsboard.server.gen.transport.TransportProtos;
23 * Created by ashvayka on 15.10.18. 23 * Created by ashvayka on 15.10.18.
24 */ 24 */
25 @Data 25 @Data
26 -public class SessionMetaData { 26 +class SessionMetaData {
27 27
28 private final TransportProtos.SessionInfoProto sessionInfo; 28 private final TransportProtos.SessionInfoProto sessionInfo;
29 private final TransportProtos.SessionType sessionType; 29 private final TransportProtos.SessionType sessionType;
30 private final SessionMsgListener listener; 30 private final SessionMsgListener listener;
31 31
  32 + private volatile long lastActivityTime;
  33 + private volatile boolean subscribedToAttributes;
  34 + private volatile boolean subscribedToRPC;
  35 +
  36 + SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
  37 + this.sessionInfo = sessionInfo;
  38 + this.sessionType = sessionType;
  39 + this.listener = listener;
  40 + this.lastActivityTime = System.currentTimeMillis();
  41 + }
  42 +
  43 + void updateLastActivityTime() {
  44 + this.lastActivityTime = System.currentTimeMillis();
  45 + }
  46 +
32 } 47 }
@@ -23,6 +23,9 @@ transport: @@ -23,6 +23,9 @@ transport:
23 bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" 23 bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
24 bind_port: "${COAP_BIND_PORT:5683}" 24 bind_port: "${COAP_BIND_PORT:5683}"
25 timeout: "${COAP_TIMEOUT:10000}" 25 timeout: "${COAP_TIMEOUT:10000}"
  26 + sessions:
  27 + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
  28 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
26 rate_limits: 29 rate_limits:
27 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" 30 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
28 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" 31 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
@@ -24,6 +24,9 @@ server: @@ -24,6 +24,9 @@ server:
24 transport: 24 transport:
25 http: 25 http:
26 request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" 26 request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"
  27 + sessions:
  28 + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
  29 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
27 rate_limits: 30 rate_limits:
28 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" 31 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
29 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" 32 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"
@@ -44,8 +44,8 @@ transport: @@ -44,8 +44,8 @@ transport:
44 # Type of the key store 44 # Type of the key store
45 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" 45 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
46 sessions: 46 sessions:
47 - max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}"  
48 - max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}" 47 + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
  48 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
49 rate_limits: 49 rate_limits:
50 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" 50 enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}"
51 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" 51 tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"