Commit 61e1ce44aa4bce2aeb28d5fd1fdfc6f811f1fec6

Authored by vzikratyi
Committed by Andrew Shvayka
1 parent 30210c70

device creation lock upgrade

@@ -71,7 +71,7 @@ public class GatewaySessionHandler { @@ -71,7 +71,7 @@ public class GatewaySessionHandler {
71 private final TransportService transportService; 71 private final TransportService transportService;
72 private final DeviceInfoProto gateway; 72 private final DeviceInfoProto gateway;
73 private final UUID sessionId; 73 private final UUID sessionId;
74 - private final Lock deviceCreationLock = new ReentrantLock(); 74 + private final ConcurrentMap<String, Lock> deviceCreationLockMap;
75 private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices; 75 private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
76 private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures; 76 private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures;
77 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; 77 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
@@ -86,6 +86,7 @@ public class GatewaySessionHandler { @@ -86,6 +86,7 @@ public class GatewaySessionHandler {
86 this.sessionId = sessionId; 86 this.sessionId = sessionId;
87 this.devices = new ConcurrentHashMap<>(); 87 this.devices = new ConcurrentHashMap<>();
88 this.deviceFutures = new ConcurrentHashMap<>(); 88 this.deviceFutures = new ConcurrentHashMap<>();
  89 + this.deviceCreationLockMap = new ConcurrentHashMap<>();
89 this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); 90 this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
90 this.channel = deviceSessionCtx.getChannel(); 91 this.channel = deviceSessionCtx.getChannel();
91 } 92 }
@@ -113,6 +114,7 @@ public class GatewaySessionHandler { @@ -113,6 +114,7 @@ public class GatewaySessionHandler {
113 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) { 114 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
114 GatewayDeviceSessionCtx result = devices.get(deviceName); 115 GatewayDeviceSessionCtx result = devices.get(deviceName);
115 if (result == null) { 116 if (result == null) {
  117 + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
116 deviceCreationLock.lock(); 118 deviceCreationLock.lock();
117 try { 119 try {
118 result = devices.get(deviceName); 120 result = devices.get(deviceName);
@@ -145,6 +147,7 @@ public class GatewaySessionHandler { @@ -145,6 +147,7 @@ public class GatewaySessionHandler {
145 public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { 147 public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
146 GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); 148 GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
147 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { 149 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  150 + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
148 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); 151 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
149 transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); 152 transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
150 transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 153 transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);