Commit 79b4411c69f1c49233ecf21322b3ece695d8ecc5

Authored by Andrii Shvaika
1 parent d78193f2

Synchronization of device connect processing in gateways

@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -69,7 +69,8 @@ public class GatewaySessionHandler { @@ -69,7 +69,8 @@ public class GatewaySessionHandler {
69 private final TransportService transportService; 69 private final TransportService transportService;
70 private final DeviceInfoProto gateway; 70 private final DeviceInfoProto gateway;
71 private final UUID sessionId; 71 private final UUID sessionId;
72 - private final Map<String, GatewayDeviceSessionCtx> devices; 72 + private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
  73 + private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures;
73 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; 74 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
74 private final ChannelHandlerContext channel; 75 private final ChannelHandlerContext channel;
75 private final DeviceSessionCtx deviceSessionCtx; 76 private final DeviceSessionCtx deviceSessionCtx;
@@ -81,6 +82,7 @@ public class GatewaySessionHandler { @@ -81,6 +82,7 @@ public class GatewaySessionHandler {
81 this.gateway = deviceSessionCtx.getDeviceInfo(); 82 this.gateway = deviceSessionCtx.getDeviceInfo();
82 this.sessionId = sessionId; 83 this.sessionId = sessionId;
83 this.devices = new ConcurrentHashMap<>(); 84 this.devices = new ConcurrentHashMap<>();
  85 + this.deviceFutures = new ConcurrentHashMap<>();
84 this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); 86 this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
85 this.channel = deviceSessionCtx.getChannel(); 87 this.channel = deviceSessionCtx.getChannel();
86 } 88 }
@@ -106,35 +108,51 @@ public class GatewaySessionHandler { @@ -106,35 +108,51 @@ public class GatewaySessionHandler {
106 } 108 }
107 109
108 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) { 110 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
109 - SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create(); 111 + SettableFuture<GatewayDeviceSessionCtx> future;
110 GatewayDeviceSessionCtx result = devices.get(deviceName); 112 GatewayDeviceSessionCtx result = devices.get(deviceName);
111 if (result == null) { 113 if (result == null) {
112 - transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()  
113 - .setDeviceName(deviceName)  
114 - .setDeviceType(deviceType)  
115 - .setGatewayIdMSB(gateway.getDeviceIdMSB())  
116 - .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),  
117 - new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {  
118 - @Override  
119 - public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {  
120 - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);  
121 - if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {  
122 - SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();  
123 - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);  
124 - transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);  
125 - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);  
126 - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);  
127 - }  
128 - future.set(devices.get(deviceName));  
129 - }  
130 -  
131 - @Override  
132 - public void onError(Throwable e) {  
133 - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);  
134 - future.setException(e);  
135 - }  
136 - }); 114 + synchronized (deviceFutures) {
  115 + future = deviceFutures.get(deviceName);
  116 + if (future == null) {
  117 + final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
  118 + deviceFutures.put(deviceName, futureToSet);
  119 + future = futureToSet;
  120 + try {
  121 + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
  122 + .setDeviceName(deviceName)
  123 + .setDeviceType(deviceType)
  124 + .setGatewayIdMSB(gateway.getDeviceIdMSB())
  125 + .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
  126 + new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
  127 + @Override
  128 + public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
  129 + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
  130 + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  131 + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
  132 + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
  133 + transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  134 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
  135 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
  136 + }
  137 + futureToSet.set(devices.get(deviceName));
  138 + deviceFutures.remove(deviceName);
  139 + }
  140 +
  141 + @Override
  142 + public void onError(Throwable e) {
  143 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
  144 + futureToSet.setException(e);
  145 + deviceFutures.remove(deviceName);
  146 + }
  147 + });
  148 + } catch (Throwable e) {
  149 + deviceFutures.remove(deviceName);
  150 + throw e;
  151 + }
  152 + }
  153 + }
137 } else { 154 } else {
  155 + future = SettableFuture.create();
138 future.set(result); 156 future.set(result);
139 } 157 }
140 return future; 158 return future;