Commit 7eb55b9040982b0577b514d5a16ec226b48992c7

Authored by Andrii Shvaika
2 parents 601599f7 a51c00bd

Merge remote-tracking branch 'origin/master' into develop/3.3.1

... ... @@ -638,20 +638,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
638 638 UUID sessionId = getSessionId(sessionInfoProto);
639 639 Objects.requireNonNull(sessionId);
640 640
641   - SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
642   - id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), subscriptionInfo.getLastActivityTime()));
643   -
644   - sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
645   - sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
646   - sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
647   - if (subscriptionInfo.getAttributeSubscription()) {
648   - attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
649   - }
650   - if (subscriptionInfo.getRpcSubscription()) {
651   - rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  641 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  642 + if (sessionMD != null) {
  643 + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
  644 + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
  645 + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
  646 + if (subscriptionInfo.getAttributeSubscription()) {
  647 + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  648 + }
  649 + if (subscriptionInfo.getRpcSubscription()) {
  650 + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  651 + }
652 652 }
653 653 systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
654   - dumpSessions();
  654 + if (sessionMD != null) {
  655 + dumpSessions();
  656 + }
655 657 }
656 658
657 659 void processCredentialsUpdate(TbActorMsg msg) {
... ...
... ... @@ -563,7 +563,7 @@ js:
563 563 transport:
564 564 sessions:
565 565 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
566   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  566 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
567 567 json:
568 568 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
569 569 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
... ...
... ... @@ -115,7 +115,6 @@ public class DeviceApiController implements TbTransportService {
115 115 TransportService transportService = transportContext.getTransportService();
116 116 transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
117 117 new HttpOkCallback(responseWriter));
118   - reportActivity(sessionInfo);
119 118 }));
120 119 return responseWriter;
121 120 }
... ... @@ -129,7 +128,6 @@ public class DeviceApiController implements TbTransportService {
129 128 TransportService transportService = transportContext.getTransportService();
130 129 transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
131 130 new HttpOkCallback(responseWriter));
132   - reportActivity(sessionInfo);
133 131 }));
134 132 return responseWriter;
135 133 }
... ... @@ -419,14 +417,6 @@ public class DeviceApiController implements TbTransportService {
419 417
420 418 }
421 419
422   - private void reportActivity(SessionInfoProto sessionInfo) {
423   - transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
424   - .setAttributeSubscription(false)
425   - .setRpcSubscription(false)
426   - .setLastActivityTime(System.currentTimeMillis())
427   - .build(), TransportServiceCallback.EMPTY);
428   - }
429   -
430 420 private static MediaType parseMediaType(String contentType) {
431 421 try {
432 422 return MediaType.parseMediaType(contentType);
... ...
... ... @@ -908,7 +908,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
908 908 * @param sessionInfo -
909 909 */
910 910 private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
911   - if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
  911 + if (sessionInfo != null && !transportService.hasSession(sessionInfo)) {
912 912 sessionManager.register(sessionInfo);
913 913 this.reportActivitySubscription(sessionInfo);
914 914 }
... ...
... ... @@ -126,7 +126,7 @@ public interface TransportService {
126 126
127 127 SessionMetaData registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
128 128
129   - SessionMetaData reportActivity(SessionInfoProto sessionInfo);
  129 + void reportActivity(SessionInfoProto sessionInfo);
130 130
131 131 void deregisterSession(SessionInfoProto sessionInfo);
132 132
... ... @@ -135,4 +135,6 @@ public interface TransportService {
135 135 void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback<Void> empty);
136 136
137 137 ExecutorService getCallbackExecutor();
  138 +
  139 + boolean hasSession(SessionInfoProto sessionInfo);
138 140 }
... ...
... ... @@ -95,10 +95,12 @@ import org.thingsboard.server.queue.util.TbTransportComponent;
95 95 import javax.annotation.PostConstruct;
96 96 import javax.annotation.PreDestroy;
97 97 import java.util.Collections;
  98 +import java.util.HashSet;
98 99 import java.util.List;
99 100 import java.util.Map;
100 101 import java.util.Optional;
101 102 import java.util.Random;
  103 +import java.util.Set;
102 104 import java.util.UUID;
103 105 import java.util.concurrent.ConcurrentHashMap;
104 106 import java.util.concurrent.ConcurrentMap;
... ... @@ -162,6 +164,7 @@ public class DefaultTransportService implements TransportService {
162 164 private ExecutorService mainConsumerExecutor;
163 165
164 166 private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
  167 + private final ConcurrentMap<UUID, SessionActivityData> sessionsActivity = new ConcurrentHashMap<>();
165 168 private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
166 169
167 170 private volatile boolean stopped = false;
... ... @@ -545,8 +548,11 @@ public class DefaultTransportService implements TransportService {
545 548 @Override
546 549 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
547 550 if (checkLimits(sessionInfo, msg, callback)) {
548   - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
549   - sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
  551 + SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo));
  552 + if (sessionMetaData != null) {
  553 + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
  554 + }
  555 + reportActivityInternal(sessionInfo);
550 556 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(),
551 557 new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
552 558 }
... ... @@ -555,8 +561,11 @@ public class DefaultTransportService implements TransportService {
555 561 @Override
556 562 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
557 563 if (checkLimits(sessionInfo, msg, callback)) {
558   - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
559   - sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
  564 + SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo));
  565 + if (sessionMetaData != null) {
  566 + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
  567 + }
  568 + reportActivityInternal(sessionInfo);
560 569 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(),
561 570 new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
562 571 }
... ... @@ -668,52 +677,61 @@ public class DefaultTransportService implements TransportService {
668 677 }
669 678
670 679 @Override
671   - public SessionMetaData reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
672   - return reportActivityInternal(sessionInfo);
  680 + public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
  681 + reportActivityInternal(sessionInfo);
673 682 }
674 683
675   - private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
  684 + private void reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
676 685 UUID sessionId = toSessionId(sessionInfo);
677   - SessionMetaData sessionMetaData = sessions.get(sessionId);
678   - if (sessionMetaData != null) {
679   - sessionMetaData.updateLastActivityTime();
680   - }
681   - return sessionMetaData;
  686 + SessionActivityData sessionMetaData = sessionsActivity.computeIfAbsent(sessionId, id -> new SessionActivityData(sessionInfo));
  687 + sessionMetaData.updateLastActivityTime();
682 688 }
683 689
684 690 private void checkInactivityAndReportActivity() {
685 691 long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
686   - sessions.forEach((uuid, sessionMD) -> {
687   - long lastActivityTime = sessionMD.getLastActivityTime();
688   - TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
689   - if (sessionInfo.getGwSessionIdMSB() != 0 &&
690   - sessionInfo.getGwSessionIdLSB() != 0) {
691   - SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
  692 + Set<UUID> sessionsToRemove = new HashSet<>();
  693 + sessionsActivity.forEach((uuid, sessionAD) -> {
  694 + long lastActivityTime = sessionAD.getLastActivityTime();
  695 + SessionMetaData sessionMD = sessions.get(uuid);
  696 + if (sessionMD != null) {
  697 + sessionAD.setSessionInfo(sessionMD.getSessionInfo());
  698 + } else {
  699 + sessionsToRemove.add(uuid);
  700 + }
  701 + TransportProtos.SessionInfoProto sessionInfo = sessionAD.getSessionInfo();
  702 +
  703 + if (sessionInfo.getGwSessionIdMSB() != 0 && sessionInfo.getGwSessionIdLSB() != 0) {
  704 + var gwSessionId = new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB());
  705 + SessionMetaData gwMetaData = sessions.get(gwSessionId);
  706 + SessionActivityData gwActivityData = sessionsActivity.get(gwSessionId);
692 707 if (gwMetaData != null && gwMetaData.isOverwriteActivityTime()) {
693   - lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
  708 + lastActivityTime = Math.max(gwActivityData.getLastActivityTime(), lastActivityTime);
694 709 }
695 710 }
696 711 if (lastActivityTime < expTime) {
697   - if (log.isDebugEnabled()) {
698   - log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
  712 + if (sessionMD != null) {
  713 + if (log.isDebugEnabled()) {
  714 + log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
  715 + }
  716 + sessions.remove(uuid);
  717 + sessionsToRemove.add(uuid);
  718 + process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  719 + TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto
  720 + .newBuilder()
  721 + .setMessage("Session has expired due to last activity time!")
  722 + .build();
  723 + sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto);
699 724 }
700   - process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
701   - sessions.remove(uuid);
702   - TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto
703   - .newBuilder()
704   - .setMessage("session has expired due to last activity time!")
705   - .build();
706   - sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto);
707 725 } else {
708   - if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
  726 + if (lastActivityTime > sessionAD.getLastReportedActivityTime()) {
709 727 final long lastActivityTimeFinal = lastActivityTime;
710 728 process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
711   - .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
712   - .setRpcSubscription(sessionMD.isSubscribedToRPC())
  729 + .setAttributeSubscription(sessionMD != null && sessionMD.isSubscribedToAttributes())
  730 + .setRpcSubscription(sessionMD != null && sessionMD.isSubscribedToRPC())
713 731 .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
714 732 @Override
715 733 public void onSuccess(Void msg) {
716   - sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
  734 + sessionAD.setLastReportedActivityTime(lastActivityTimeFinal);
717 735 }
718 736
719 737 @Override
... ... @@ -724,6 +742,8 @@ public class DefaultTransportService implements TransportService {
724 742 }
725 743 }
726 744 });
  745 + // Removes all closed or short-lived sessions.
  746 + sessionsToRemove.forEach(sessionsActivity::remove);
727 747 }
728 748
729 749 @Override
... ... @@ -1146,4 +1166,9 @@ public class DefaultTransportService implements TransportService {
1146 1166 public ExecutorService getCallbackExecutor() {
1147 1167 return transportCallbackExecutor;
1148 1168 }
  1169 +
  1170 + @Override
  1171 + public boolean hasSession(TransportProtos.SessionInfoProto sessionInfo) {
  1172 + return sessions.containsKey(toSessionId(sessionInfo));
  1173 + }
1149 1174 }
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport.service;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.transport.SessionMsgListener;
  20 +import org.thingsboard.server.gen.transport.TransportProtos;
  21 +
  22 +import java.util.concurrent.ScheduledFuture;
  23 +
  24 +/**
  25 + * Created by ashvayka on 15.10.18.
  26 + */
  27 +@Data
  28 +public class SessionActivityData {
  29 +
  30 + private volatile TransportProtos.SessionInfoProto sessionInfo;
  31 + private volatile long lastActivityTime;
  32 + private volatile long lastReportedActivityTime;
  33 +
  34 + SessionActivityData(TransportProtos.SessionInfoProto sessionInfo) {
  35 + this.sessionInfo = sessionInfo;
  36 + }
  37 +
  38 + void updateLastActivityTime() {
  39 + this.lastActivityTime = System.currentTimeMillis();
  40 + }
  41 +
  42 +}
... ...
... ... @@ -32,8 +32,6 @@ public class SessionMetaData {
32 32 private final SessionMsgListener listener;
33 33
34 34 private volatile ScheduledFuture scheduledFuture;
35   - private volatile long lastActivityTime;
36   - private volatile long lastReportedActivityTime;
37 35 private volatile boolean subscribedToAttributes;
38 36 private volatile boolean subscribedToRPC;
39 37 private volatile boolean overwriteActivityTime;
... ... @@ -42,14 +40,9 @@ public class SessionMetaData {
42 40 this.sessionInfo = sessionInfo;
43 41 this.sessionType = sessionType;
44 42 this.listener = listener;
45   - this.lastActivityTime = System.currentTimeMillis();
46 43 this.scheduledFuture = null;
47 44 }
48 45
49   - void updateLastActivityTime() {
50   - this.lastActivityTime = System.currentTimeMillis();
51   - }
52   -
53 46 void setScheduledFuture(ScheduledFuture scheduledFuture) {
54 47 this.scheduledFuture = scheduledFuture;
55 48 }
... ...
... ... @@ -113,7 +113,7 @@ transport:
113 113 dtls_session_report_timeout: "${TB_COAP_X509_DTLS_SESSION_REPORT_TIMEOUT:1800000}"
114 114 sessions:
115 115 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
116   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  116 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
117 117 json:
118 118 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
119 119 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
... ...
... ... @@ -85,7 +85,7 @@ transport:
85 85 max_request_timeout: "${HTTP_MAX_REQUEST_TIMEOUT:300000}"
86 86 sessions:
87 87 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
88   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  88 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
89 89 json:
90 90 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
91 91 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
... ...
... ... @@ -87,7 +87,7 @@ redis:
87 87 transport:
88 88 sessions:
89 89 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
90   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  90 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
91 91 json:
92 92 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
93 93 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:false}"
... ...
... ... @@ -118,7 +118,7 @@ transport:
118 118 skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
119 119 sessions:
120 120 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
121   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  121 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
122 122 json:
123 123 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
124 124 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
... ...
... ... @@ -50,7 +50,7 @@ transport:
50 50 underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
51 51 sessions:
52 52 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
53   - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
  53 + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
54 54 json:
55 55 # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
56 56 type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"
... ...