Commit a38ee80e0957d52bc21f0a201b081edf4d3420f2

Authored by Andrii Shvaika
1 parent 79b4411c

Refactoring of device creation using gateway

@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -54,6 +54,8 @@ import java.util.Set; @@ -54,6 +54,8 @@ import java.util.Set;
54 import java.util.UUID; 54 import java.util.UUID;
55 import java.util.concurrent.ConcurrentHashMap; 55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.ConcurrentMap; 56 import java.util.concurrent.ConcurrentMap;
  57 +import java.util.concurrent.locks.Lock;
  58 +import java.util.concurrent.locks.ReentrantLock;
57 59
58 /** 60 /**
59 * Created by ashvayka on 19.01.17. 61 * Created by ashvayka on 19.01.17.
@@ -69,6 +71,7 @@ public class GatewaySessionHandler { @@ -69,6 +71,7 @@ public class GatewaySessionHandler {
69 private final TransportService transportService; 71 private final TransportService transportService;
70 private final DeviceInfoProto gateway; 72 private final DeviceInfoProto gateway;
71 private final UUID sessionId; 73 private final UUID sessionId;
  74 + private final Lock deviceCreationLock = new ReentrantLock();
72 private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices; 75 private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
73 private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures; 76 private final ConcurrentMap<String, SettableFuture<GatewayDeviceSessionCtx>> deviceFutures;
74 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; 77 private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
@@ -108,53 +111,70 @@ public class GatewaySessionHandler { @@ -108,53 +111,70 @@ public class GatewaySessionHandler {
108 } 111 }
109 112
110 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) { 113 private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
111 - SettableFuture<GatewayDeviceSessionCtx> future;  
112 GatewayDeviceSessionCtx result = devices.get(deviceName); 114 GatewayDeviceSessionCtx result = devices.get(deviceName);
113 if (result == null) { 115 if (result == null) {
114 - synchronized (deviceFutures) {  
115 - future = deviceFutures.get(deviceName);  
116 - if (future == null) {  
117 - final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();  
118 - deviceFutures.put(deviceName, futureToSet);  
119 - future = futureToSet;  
120 - try {  
121 - transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()  
122 - .setDeviceName(deviceName)  
123 - .setDeviceType(deviceType)  
124 - .setGatewayIdMSB(gateway.getDeviceIdMSB())  
125 - .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),  
126 - new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {  
127 - @Override  
128 - public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {  
129 - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);  
130 - if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {  
131 - SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();  
132 - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);  
133 - transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);  
134 - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);  
135 - transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);  
136 - }  
137 - futureToSet.set(devices.get(deviceName));  
138 - deviceFutures.remove(deviceName);  
139 - }  
140 -  
141 - @Override  
142 - public void onError(Throwable e) {  
143 - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);  
144 - futureToSet.setException(e);  
145 - deviceFutures.remove(deviceName);  
146 - }  
147 - });  
148 - } catch (Throwable e) {  
149 - deviceFutures.remove(deviceName);  
150 - throw e;  
151 - } 116 + deviceCreationLock.lock();
  117 + try {
  118 + result = devices.get(deviceName);
  119 + if (result == null) {
  120 + return getDeviceCreationFuture(deviceName, deviceType);
  121 + } else {
  122 + return toCompletedFuture(result);
152 } 123 }
  124 + } finally {
  125 + deviceCreationLock.unlock();
153 } 126 }
154 } else { 127 } else {
155 - future = SettableFuture.create();  
156 - future.set(result); 128 + return toCompletedFuture(result);
157 } 129 }
  130 + }
  131 +
  132 + private ListenableFuture<GatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
  133 + SettableFuture<GatewayDeviceSessionCtx> future = deviceFutures.get(deviceName);
  134 + if (future == null) {
  135 + final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
  136 + deviceFutures.put(deviceName, futureToSet);
  137 + try {
  138 + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
  139 + .setDeviceName(deviceName)
  140 + .setDeviceType(deviceType)
  141 + .setGatewayIdMSB(gateway.getDeviceIdMSB())
  142 + .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
  143 + new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
  144 + @Override
  145 + public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
  146 + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
  147 + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  148 + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
  149 + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
  150 + transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  151 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
  152 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
  153 + }
  154 + futureToSet.set(devices.get(deviceName));
  155 + deviceFutures.remove(deviceName);
  156 + }
  157 +
  158 + @Override
  159 + public void onError(Throwable e) {
  160 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
  161 + futureToSet.setException(e);
  162 + deviceFutures.remove(deviceName);
  163 + }
  164 + });
  165 + return futureToSet;
  166 + } catch (Throwable e) {
  167 + deviceFutures.remove(deviceName);
  168 + throw e;
  169 + }
  170 + } else {
  171 + return future;
  172 + }
  173 + }
  174 +
  175 + private ListenableFuture<GatewayDeviceSessionCtx> toCompletedFuture(GatewayDeviceSessionCtx result) {
  176 + SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create();
  177 + future.set(result);
158 return future; 178 return future;
159 } 179 }
160 180