Commit a44230a253fb47eb5d3be8663ff115bf10372afb

Authored by Sergey Matvienko
Committed by Andrew Shvayka
1 parent 57b0e19e

DeviceActorMessageProcessor: introduced LinkedHashMapRemoveEldest with BiConsume…

…r instead many ensureSessionsCapacity() that spreaded over the class. test added
@@ -24,6 +24,7 @@ import com.google.protobuf.InvalidProtocolBufferException; @@ -24,6 +24,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
24 import lombok.extern.slf4j.Slf4j; 24 import lombok.extern.slf4j.Slf4j;
25 import org.apache.commons.collections.CollectionUtils; 25 import org.apache.commons.collections.CollectionUtils;
26 import org.thingsboard.common.util.JacksonUtil; 26 import org.thingsboard.common.util.JacksonUtil;
  27 +import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
27 import org.thingsboard.rule.engine.api.RpcError; 28 import org.thingsboard.rule.engine.api.RpcError;
28 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; 29 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
29 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; 30 import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
@@ -112,7 +113,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -112,7 +113,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
112 113
113 final TenantId tenantId; 114 final TenantId tenantId;
114 final DeviceId deviceId; 115 final DeviceId deviceId;
115 - private final LinkedHashMap<UUID, SessionInfoMetaData> sessions; 116 + final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
116 private final Map<UUID, SessionInfo> attributeSubscriptions; 117 private final Map<UUID, SessionInfo> attributeSubscriptions;
117 private final Map<UUID, SessionInfo> rpcSubscriptions; 118 private final Map<UUID, SessionInfo> rpcSubscriptions;
118 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap; 119 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
@@ -127,16 +128,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -127,16 +128,16 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
127 super(systemContext); 128 super(systemContext);
128 this.tenantId = tenantId; 129 this.tenantId = tenantId;
129 this.deviceId = deviceId; 130 this.deviceId = deviceId;
130 - this.sessions = new LinkedHashMap<>();  
131 this.attributeSubscriptions = new HashMap<>(); 131 this.attributeSubscriptions = new HashMap<>();
132 this.rpcSubscriptions = new HashMap<>(); 132 this.rpcSubscriptions = new HashMap<>();
133 this.toDeviceRpcPendingMap = new HashMap<>(); 133 this.toDeviceRpcPendingMap = new HashMap<>();
  134 + this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSession);
134 if (initAttributes()) { 135 if (initAttributes()) {
135 restoreSessions(); 136 restoreSessions();
136 } 137 }
137 } 138 }
138 139
139 - private boolean initAttributes() { 140 + boolean initAttributes() {
140 Device device = systemContext.getDeviceService().findDeviceById(tenantId, deviceId); 141 Device device = systemContext.getDeviceService().findDeviceById(tenantId, deviceId);
141 if (device != null) { 142 if (device != null) {
142 this.deviceName = device.getName(); 143 this.deviceName = device.getName();
@@ -562,8 +563,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -562,8 +563,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
562 } 563 }
563 log.info("[{}] Processing new session [{}]. Current sessions size {}", deviceId, sessionId, sessions.size()); 564 log.info("[{}] Processing new session [{}]. Current sessions size {}", deviceId, sessionId, sessions.size());
564 565
565 - ensureSessionsCapacity();  
566 -  
567 sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId()))); 566 sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
568 if (sessions.size() == 1) { 567 if (sessions.size() == 1) {
569 reportSessionOpen(); 568 reportSessionOpen();
@@ -582,24 +581,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -582,24 +581,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
582 } 581 }
583 } 582 }
584 583
585 - private void ensureSessionsCapacity() {  
586 - while (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {  
587 - Optional<UUID> sessionIdToRemove = sessions.keySet().stream().findFirst();  
588 - if (sessionIdToRemove.isPresent()) {  
589 - notifyTransportAboutClosedSession(sessionIdToRemove.get(), sessions.remove(sessionIdToRemove.get()), "max concurrent sessions limit reached per device!");  
590 - } else {  
591 - log.warn("[{}] Can't remove session because find first returns null", deviceId);  
592 - }  
593 - }  
594 - log.debug("[{}] sessions size after clean up {}", deviceId, sessions.size());  
595 - }  
596 -  
597 private void handleSessionActivity(TbActorCtx context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { 584 private void handleSessionActivity(TbActorCtx context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
598 UUID sessionId = getSessionId(sessionInfoProto); 585 UUID sessionId = getSessionId(sessionInfoProto);
599 Objects.requireNonNull(sessionId); 586 Objects.requireNonNull(sessionId);
600 587
601 - ensureSessionsCapacity();  
602 -  
603 SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId, 588 SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
604 id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), subscriptionInfo.getLastActivityTime())); 589 id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), subscriptionInfo.getLastActivityTime()));
605 590
@@ -777,7 +762,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -777,7 +762,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
777 return builder.build(); 762 return builder.build();
778 } 763 }
779 764
780 - private void restoreSessions() { 765 + void restoreSessions() {
781 log.debug("[{}] Restoring sessions from cache", deviceId); 766 log.debug("[{}] Restoring sessions from cache", deviceId);
782 DeviceSessionsCacheEntry sessionsDump = null; 767 DeviceSessionsCacheEntry sessionsDump = null;
783 try { 768 try {
@@ -809,13 +794,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -809,13 +794,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
809 log.debug("[{}] Restored session: {}", deviceId, sessionMD); 794 log.debug("[{}] Restored session: {}", deviceId, sessionMD);
810 } 795 }
811 log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); 796 log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
812 -  
813 - ensureSessionsCapacity();  
814 } 797 }
815 798
816 private void dumpSessions() { 799 private void dumpSessions() {
817 - ensureSessionsCapacity();  
818 -  
819 log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); 800 log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
820 List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size()); 801 List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
821 sessions.forEach((uuid, sessionMD) -> { 802 sessions.forEach((uuid, sessionMD) -> {
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors.device;
  17 +
  18 +import org.junit.Before;
  19 +import org.junit.Test;
  20 +import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
  21 +import org.thingsboard.server.actors.ActorSystemContext;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.dao.device.DeviceService;
  25 +
  26 +import static org.hamcrest.CoreMatchers.instanceOf;
  27 +import static org.hamcrest.CoreMatchers.is;
  28 +import static org.hamcrest.CoreMatchers.notNullValue;
  29 +import static org.hamcrest.MatcherAssert.assertThat;
  30 +import static org.mockito.BDDMockito.willReturn;
  31 +import static org.mockito.Mockito.mock;
  32 +
  33 +public class DeviceActorMessageProcessorTest {
  34 +
  35 + public static final long MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10L;
  36 + ActorSystemContext systemContext;
  37 + DeviceService deviceService;
  38 + TenantId tenantId = TenantId.SYS_TENANT_ID;
  39 + DeviceId deviceId = DeviceId.fromString("78bf9b26-74ef-4af2-9cfb-ad6cf24ad2ec");
  40 +
  41 + DeviceActorMessageProcessor processor;
  42 +
  43 + @Before
  44 + public void setUp() {
  45 + systemContext = mock(ActorSystemContext.class);
  46 + deviceService = mock(DeviceService.class);
  47 + willReturn(MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice();
  48 + willReturn(deviceService).given(systemContext).getDeviceService();
  49 + processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
  50 + }
  51 +
  52 + @Test
  53 + public void givenSystemContext_whenNewInstance_thenVerifySessionMapMaxSize() {
  54 + assertThat(processor.sessions, instanceOf(LinkedHashMapRemoveEldest.class));
  55 + assertThat(processor.sessions.getMaxEntries(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
  56 + assertThat(processor.sessions.getRemovalConsumer(), notNullValue());
  57 + }
  58 +}