Commit 529608e60fffd83484685cc18b1e641701d89b99

Authored by Andrii Shvaika
1 parent 63406b01

Improved transport cache invalidation for device updates

@@ -72,6 +72,7 @@ import java.io.IOException; @@ -72,6 +72,7 @@ import java.io.IOException;
72 import java.net.InetSocketAddress; 72 import java.net.InetSocketAddress;
73 import java.util.ArrayList; 73 import java.util.ArrayList;
74 import java.util.List; 74 import java.util.List;
  75 +import java.util.Optional;
75 import java.util.UUID; 76 import java.util.UUID;
76 import java.util.concurrent.ConcurrentHashMap; 77 import java.util.concurrent.ConcurrentHashMap;
77 import java.util.concurrent.ConcurrentMap; 78 import java.util.concurrent.ConcurrentMap;
@@ -701,12 +702,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -701,12 +702,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
701 } 702 }
702 703
703 @Override 704 @Override
704 - public void onProfileUpdate(DeviceProfile deviceProfile) {  
705 - deviceSessionCtx.onProfileUpdate(deviceProfile); 705 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  706 + deviceSessionCtx.onDeviceProfileUpdate(sessionInfo, deviceProfile);
706 } 707 }
707 708
708 @Override 709 @Override
709 - public void onDeviceProfileUpdate(Device device, TransportProtos.SessionInfoProto sessionInfo) {  
710 - deviceSessionCtx.onDeviceProfileUpdate(device, sessionInfo); 710 + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  711 + deviceSessionCtx.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
711 } 712 }
712 } 713 }
@@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportC
27 import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; 27 import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
28 import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration; 28 import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
29 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; 29 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
  30 +import org.thingsboard.server.gen.transport.TransportProtos;
30 import org.thingsboard.server.transport.mqtt.MqttTransportContext; 31 import org.thingsboard.server.transport.mqtt.MqttTransportContext;
31 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 32 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
32 import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter; 33 import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter;
@@ -108,8 +109,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -108,8 +109,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
108 } 109 }
109 110
110 @Override 111 @Override
111 - public void onProfileUpdate(DeviceProfile deviceProfile) {  
112 - super.onProfileUpdate(deviceProfile); 112 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  113 + super.onDeviceProfileUpdate(sessionInfo, deviceProfile);
113 updateTopicFilters(deviceProfile); 114 updateTopicFilters(deviceProfile);
114 } 115 }
115 116
@@ -24,6 +24,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse @@ -24,6 +24,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse
24 import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto; 24 import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
26 26
  27 +import java.util.Optional;
  28 +
27 /** 29 /**
28 * Created by ashvayka on 04.10.18. 30 * Created by ashvayka on 04.10.18.
29 */ 31 */
@@ -39,9 +41,9 @@ public interface SessionMsgListener { @@ -39,9 +41,9 @@ public interface SessionMsgListener {
39 41
40 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse); 42 void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse);
41 43
42 - default void onProfileUpdate(DeviceProfile deviceProfile) { 44 + default void onDeviceProfileUpdate(TransportProtos.SessionInfoProto newSessionInfo, DeviceProfile deviceProfile) {
43 } 45 }
44 46
45 - default void onDeviceProfileUpdate(Device device, TransportProtos.SessionInfoProto sessionInfo) { 47 + default void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
46 } 48 }
47 } 49 }
@@ -672,9 +672,17 @@ public class DefaultTransportService implements TransportService { @@ -672,9 +672,17 @@ public class DefaultTransportService implements TransportService {
672 long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits(); 672 long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits();
673 long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits(); 673 long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits();
674 sessions.forEach((id, md) -> { 674 sessions.forEach((id, md) -> {
  675 + //TODO: if transport types are different - we should close the session.
675 if (md.getSessionInfo().getDeviceProfileIdMSB() == deviceProfileIdMSB 676 if (md.getSessionInfo().getDeviceProfileIdMSB() == deviceProfileIdMSB
676 && md.getSessionInfo().getDeviceProfileIdLSB() == deviceProfileIdLSB) { 677 && md.getSessionInfo().getDeviceProfileIdLSB() == deviceProfileIdLSB) {
677 - transportCallbackExecutor.submit(() -> md.getListener().onProfileUpdate(deviceProfile)); 678 + TransportProtos.SessionInfoProto newSessionInfo = TransportProtos.SessionInfoProto.newBuilder()
  679 + .mergeFrom(md.getSessionInfo())
  680 + .setDeviceProfileIdMSB(deviceProfileIdMSB)
  681 + .setDeviceProfileIdLSB(deviceProfileIdLSB)
  682 + .setDeviceType(deviceProfile.getName())
  683 + .build();
  684 + md.setSessionInfo(newSessionInfo);
  685 + transportCallbackExecutor.submit(() -> md.getListener().onDeviceProfileUpdate(newSessionInfo, deviceProfile));
678 } 686 }
679 }); 687 });
680 } 688 }
@@ -684,28 +692,27 @@ public class DefaultTransportService implements TransportService { @@ -684,28 +692,27 @@ public class DefaultTransportService implements TransportService {
684 long deviceIdLSB = device.getId().getId().getLeastSignificantBits(); 692 long deviceIdLSB = device.getId().getId().getLeastSignificantBits();
685 long deviceProfileIdMSB = device.getDeviceProfileId().getId().getMostSignificantBits(); 693 long deviceProfileIdMSB = device.getDeviceProfileId().getId().getMostSignificantBits();
686 long deviceProfileIdLSB = device.getDeviceProfileId().getId().getLeastSignificantBits(); 694 long deviceProfileIdLSB = device.getDeviceProfileId().getId().getLeastSignificantBits();
687 - for (Map.Entry<UUID, SessionMetaData> entry : sessions.entrySet()) {  
688 - SessionMetaData md = entry.getValue();  
689 - if ((md.getSessionInfo().getDeviceIdMSB() == deviceIdMSB  
690 - && md.getSessionInfo().getDeviceIdLSB() == deviceIdLSB)  
691 - && (md.getSessionInfo().getDeviceProfileIdMSB() != deviceProfileIdMSB  
692 - && md.getSessionInfo().getDeviceProfileIdLSB() != deviceProfileIdLSB)) {  
693 - updateSessionMetadata(device, entry, md); 695 + sessions.forEach((id, md) -> {
  696 + if ((md.getSessionInfo().getDeviceIdMSB() == deviceIdMSB && md.getSessionInfo().getDeviceIdLSB() == deviceIdLSB)) {
  697 + DeviceProfile newDeviceProfile;
  698 + if (md.getSessionInfo().getDeviceProfileIdMSB() != deviceProfileIdMSB
  699 + && md.getSessionInfo().getDeviceProfileIdLSB() != deviceProfileIdLSB) {
  700 + //TODO: if transport types are different - we should close the session.
  701 + newDeviceProfile = deviceProfileCache.get(new DeviceProfileId(new UUID(deviceProfileIdMSB, deviceProfileIdLSB)));
  702 + } else {
  703 + newDeviceProfile = null;
  704 + }
  705 + TransportProtos.SessionInfoProto newSessionInfo = TransportProtos.SessionInfoProto.newBuilder()
  706 + .mergeFrom(md.getSessionInfo())
  707 + .setDeviceProfileIdMSB(deviceProfileIdMSB)
  708 + .setDeviceProfileIdLSB(deviceProfileIdLSB)
  709 + .setDeviceName(device.getName())
  710 + .setDeviceType(device.getType())
  711 + .build();
  712 + md.setSessionInfo(newSessionInfo);
  713 + transportCallbackExecutor.submit(() -> md.getListener().onDeviceUpdate(newSessionInfo, device, Optional.ofNullable(newDeviceProfile)));
694 } 714 }
695 - }  
696 - }  
697 -  
698 - private void updateSessionMetadata(Device device, Map.Entry<UUID, SessionMetaData> entry, SessionMetaData md) {  
699 - TransportProtos.SessionInfoProto newSessionInfo = TransportProtos.SessionInfoProto.newBuilder()  
700 - .mergeFrom(md.getSessionInfo())  
701 - .setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits())  
702 - .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits())  
703 - .setDeviceType(device.getType())  
704 - .build();  
705 - SessionMetaData newSessionMetaData = new SessionMetaData(newSessionInfo, md.getSessionType(), md.getListener());  
706 - entry.setValue(newSessionMetaData);  
707 - transportCallbackExecutor.submit(() -> newSessionMetaData.getListener().onDeviceProfileUpdate(device,  
708 - newSessionMetaData.getSessionInfo())); 715 + });
709 } 716 }
710 717
711 protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) { 718 protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {
@@ -27,22 +27,17 @@ import java.util.concurrent.ScheduledFuture; @@ -27,22 +27,17 @@ import java.util.concurrent.ScheduledFuture;
27 @Data 27 @Data
28 class SessionMetaData { 28 class SessionMetaData {
29 29
30 - private final TransportProtos.SessionInfoProto sessionInfo; 30 + private volatile TransportProtos.SessionInfoProto sessionInfo;
31 private final TransportProtos.SessionType sessionType; 31 private final TransportProtos.SessionType sessionType;
32 private final SessionMsgListener listener; 32 private final SessionMsgListener listener;
33 33
34 - private ScheduledFuture scheduledFuture;  
35 - 34 + private volatile ScheduledFuture scheduledFuture;
36 private volatile long lastActivityTime; 35 private volatile long lastActivityTime;
37 private volatile long lastReportedActivityTime; 36 private volatile long lastReportedActivityTime;
38 private volatile boolean subscribedToAttributes; 37 private volatile boolean subscribedToAttributes;
39 private volatile boolean subscribedToRPC; 38 private volatile boolean subscribedToRPC;
40 39
41 - SessionMetaData(  
42 - TransportProtos.SessionInfoProto sessionInfo,  
43 - TransportProtos.SessionType sessionType,  
44 - SessionMsgListener listener  
45 - ) { 40 + SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
46 this.sessionInfo = sessionInfo; 41 this.sessionInfo = sessionInfo;
47 this.sessionType = sessionType; 42 this.sessionType = sessionType;
48 this.listener = listener; 43 this.listener = listener;
@@ -54,11 +49,15 @@ class SessionMetaData { @@ -54,11 +49,15 @@ class SessionMetaData {
54 this.lastActivityTime = System.currentTimeMillis(); 49 this.lastActivityTime = System.currentTimeMillis();
55 } 50 }
56 51
57 - void setScheduledFuture(ScheduledFuture scheduledFuture) { this.scheduledFuture = scheduledFuture; } 52 + void setScheduledFuture(ScheduledFuture scheduledFuture) {
  53 + this.scheduledFuture = scheduledFuture;
  54 + }
58 55
59 public ScheduledFuture getScheduledFuture() { 56 public ScheduledFuture getScheduledFuture() {
60 return scheduledFuture; 57 return scheduledFuture;
61 } 58 }
62 59
63 - public boolean hasScheduledFuture() { return null != this.scheduledFuture; } 60 + public boolean hasScheduledFuture() {
  61 + return null != this.scheduledFuture;
  62 + }
64 } 63 }
@@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.DeviceId; @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
24 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; 24 import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
25 import org.thingsboard.server.gen.transport.TransportProtos; 25 import org.thingsboard.server.gen.transport.TransportProtos;
26 26
  27 +import java.util.Optional;
27 import java.util.UUID; 28 import java.util.UUID;
28 29
29 /** 30 /**
@@ -58,16 +59,19 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @@ -58,16 +59,19 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
58 } 59 }
59 60
60 @Override 61 @Override
61 - public void onProfileUpdate(DeviceProfile deviceProfile) { 62 + public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
  63 + this.sessionInfo = sessionInfo;
62 this.deviceProfile = deviceProfile; 64 this.deviceProfile = deviceProfile;
63 this.deviceInfo.setDeviceType(deviceProfile.getName()); 65 this.deviceInfo.setDeviceType(deviceProfile.getName());
64 - this.sessionInfo = TransportProtos.SessionInfoProto.newBuilder().mergeFrom(sessionInfo).setDeviceType(deviceProfile.getName()).build(); 66 +
65 } 67 }
66 68
67 - public void onDeviceProfileUpdate(Device device, TransportProtos.SessionInfoProto sessionInfo) { 69 + @Override
  70 + public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
  71 + this.sessionInfo = sessionInfo;
68 this.deviceInfo.setDeviceProfileId(device.getDeviceProfileId()); 72 this.deviceInfo.setDeviceProfileId(device.getDeviceProfileId());
69 this.deviceInfo.setDeviceType(device.getType()); 73 this.deviceInfo.setDeviceType(device.getType());
70 - this.sessionInfo = sessionInfo; 74 + deviceProfileOpt.ifPresent(profile -> this.deviceProfile = profile);
71 } 75 }
72 76
73 public boolean isConnected() { 77 public boolean isConnected() {
@@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.Device; @@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.Device;
19 import org.thingsboard.server.common.data.DeviceProfile; 19 import org.thingsboard.server.common.data.DeviceProfile;
20 import org.thingsboard.server.gen.transport.TransportProtos; 20 import org.thingsboard.server.gen.transport.TransportProtos;
21 21
  22 +import java.util.Optional;
22 import java.util.UUID; 23 import java.util.UUID;
23 24
24 public interface SessionContext { 25 public interface SessionContext {
@@ -27,7 +28,7 @@ public interface SessionContext { @@ -27,7 +28,7 @@ public interface SessionContext {
27 28
28 int nextMsgId(); 29 int nextMsgId();
29 30
30 - void onProfileUpdate(DeviceProfile deviceProfile); 31 + void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile);
31 32
32 - void onDeviceProfileUpdate(Device device, TransportProtos.SessionInfoProto sessionInfo); 33 + void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt);
33 } 34 }