Commit c6800dbd032b3666ae6eb2ab5c061dc6e5a2e39c

Authored by Andrew Shvayka
1 parent 52ef95ac

BugFix for Gateway API and concurrent device creation

@@ -60,14 +60,14 @@ @@ -60,14 +60,14 @@
60 <groupId>org.thingsboard.common</groupId> 60 <groupId>org.thingsboard.common</groupId>
61 <artifactId>transport</artifactId> 61 <artifactId>transport</artifactId>
62 </dependency> 62 </dependency>
63 - <dependency>  
64 - <groupId>org.thingsboard.transport</groupId>  
65 - <artifactId>http</artifactId>  
66 - </dependency>  
67 - <dependency>  
68 - <groupId>org.thingsboard.transport</groupId>  
69 - <artifactId>coap</artifactId>  
70 - </dependency> 63 + <!--<dependency>-->
  64 + <!--<groupId>org.thingsboard.transport</groupId>-->
  65 + <!--<artifactId>http</artifactId>-->
  66 + <!--</dependency>-->
  67 + <!--<dependency>-->
  68 + <!--<groupId>org.thingsboard.transport</groupId>-->
  69 + <!--<artifactId>coap</artifactId>-->
  70 + <!--</dependency>-->
71 <dependency> 71 <dependency>
72 <groupId>org.thingsboard.transport</groupId> 72 <groupId>org.thingsboard.transport</groupId>
73 <artifactId>mqtt-common</artifactId> 73 <artifactId>mqtt-common</artifactId>
@@ -455,6 +455,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -455,6 +455,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
455 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { 455 private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
456 UUID sessionId = getSessionId(sessionInfo); 456 UUID sessionId = getSessionId(sessionInfo);
457 if (msg.getEvent() == SessionEvent.OPEN) { 457 if (msg.getEvent() == SessionEvent.OPEN) {
  458 + if(sessions.containsKey(sessionId)){
  459 + logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
  460 + return;
  461 + }
458 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); 462 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
459 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { 463 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
460 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); 464 UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
@@ -51,6 +51,7 @@ import javax.annotation.PostConstruct; @@ -51,6 +51,7 @@ import javax.annotation.PostConstruct;
51 import java.util.UUID; 51 import java.util.UUID;
52 import java.util.concurrent.ExecutorService; 52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors; 53 import java.util.concurrent.Executors;
  54 +import java.util.concurrent.locks.ReentrantLock;
54 55
55 /** 56 /**
56 * Created by ashvayka on 05.10.18. 57 * Created by ashvayka on 05.10.18.
@@ -97,6 +98,8 @@ public class RemoteTransportApiService implements TransportApiService { @@ -97,6 +98,8 @@ public class RemoteTransportApiService implements TransportApiService {
97 98
98 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate; 99 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
99 100
  101 + private ReentrantLock deviceCreationLock = new ReentrantLock();
  102 +
100 @PostConstruct 103 @PostConstruct
101 public void init() { 104 public void init() {
102 this.transportCallbackExecutor = Executors.newCachedThreadPool(); 105 this.transportCallbackExecutor = Executors.newCachedThreadPool();
@@ -156,23 +159,26 @@ public class RemoteTransportApiService implements TransportApiService { @@ -156,23 +159,26 @@ public class RemoteTransportApiService implements TransportApiService {
156 DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB())); 159 DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
157 ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId); 160 ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
158 return Futures.transform(gatewayFuture, gateway -> { 161 return Futures.transform(gatewayFuture, gateway -> {
159 - Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), gateway.getName());  
160 - if (device == null) {  
161 - device = new Device();  
162 - device.setTenantId(gateway.getTenantId());  
163 - device.setName(requestMsg.getDeviceName());  
164 - device.setType(requestMsg.getDeviceType());  
165 - device.setCustomerId(gateway.getCustomerId());  
166 - device = deviceService.saveDevice(device);  
167 - relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));  
168 - deviceStateService.onDeviceAdded(device);  
169 - } 162 + deviceCreationLock.lock();
170 try { 163 try {
  164 + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
  165 + if (device == null) {
  166 + device = new Device();
  167 + device.setTenantId(gateway.getTenantId());
  168 + device.setName(requestMsg.getDeviceName());
  169 + device.setType(requestMsg.getDeviceType());
  170 + device.setCustomerId(gateway.getCustomerId());
  171 + device = deviceService.saveDevice(device);
  172 + relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
  173 + deviceStateService.onDeviceAdded(device);
  174 + }
171 return TransportApiResponseMsg.newBuilder() 175 return TransportApiResponseMsg.newBuilder()
172 .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build(); 176 .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
173 } catch (JsonProcessingException e) { 177 } catch (JsonProcessingException e) {
174 log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e); 178 log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
175 throw new RuntimeException(e); 179 throw new RuntimeException(e);
  180 + } finally {
  181 + deviceCreationLock.unlock();
176 } 182 }
177 }, transportCallbackExecutor); 183 }, transportCallbackExecutor);
178 } 184 }
@@ -122,7 +122,8 @@ public class DeviceApiController { @@ -122,7 +122,8 @@ public class DeviceApiController {
122 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, 122 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
123 HttpServletRequest request) { 123 HttpServletRequest request) {
124 124
125 - return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request); 125 +// return subscribe(deviceToken, timeout, new RpcSubscribeMsg(), request);
  126 + return null;
126 } 127 }
127 128
128 @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST) 129 @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
@@ -174,15 +175,15 @@ public class DeviceApiController { @@ -174,15 +175,15 @@ public class DeviceApiController {
174 public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken, 175 public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
175 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout, 176 @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
176 HttpServletRequest httpRequest) { 177 HttpServletRequest httpRequest) {
177 -  
178 - return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest); 178 + return null;
  179 +// return subscribe(deviceToken, timeout, new AttributesSubscribeMsg(), httpRequest);
179 } 180 }
180 181
181 - private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {  
182 - DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();  
183 - if (quotaExceeded(httpRequest, responseWriter)) {  
184 - return responseWriter;  
185 - } 182 +// private DeferredResult<ResponseEntity> subscribe(String deviceToken, long timeout, FromDeviceMsg msg, HttpServletRequest httpRequest) {
  183 +// DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
  184 +// if (quotaExceeded(httpRequest, responseWriter)) {
  185 +// return responseWriter;
  186 +// }
186 // HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout); 187 // HttpSessionCtx ctx = getHttpSessionCtx(responseWriter, timeout);
187 // if (ctx.login(new DeviceTokenCredentials(deviceToken))) { 188 // if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
188 // try { 189 // try {
@@ -193,21 +194,22 @@ public class DeviceApiController { @@ -193,21 +194,22 @@ public class DeviceApiController {
193 // } else { 194 // } else {
194 // responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); 195 // responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
195 // } 196 // }
196 - return responseWriter;  
197 - } 197 +// return responseWriter;
  198 +// }
198 199
199 private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) { 200 private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) {
200 return getHttpSessionCtx(responseWriter, defaultTimeout); 201 return getHttpSessionCtx(responseWriter, defaultTimeout);
201 } 202 }
202 203
203 private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) { 204 private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) {
204 - return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout); 205 + return null;
  206 +// return new HttpSessionCtx(processor, authService, responseWriter, timeout != 0 ? timeout : defaultTimeout);
205 } 207 }
206 208
207 - private void process(HttpSessionCtx ctx, FromDeviceMsg request) {  
208 - AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);  
209 -// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));  
210 - } 209 +// private void process(HttpSessionCtx ctx, FromDeviceMsg request) {
  210 +// AdaptorToSessionActorMsg msg = new BasicAdaptorToSessionActorMsg(ctx, request);
  211 +//// processor.process(new BasicTransportToDeviceSessionActorMsg(ctx.getDevice(), msg));
  212 +// }
211 213
212 private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) { 214 private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
213 if (quotaService.isQuotaExceeded(request.getRemoteAddr())) { 215 if (quotaService.isQuotaExceeded(request.getRemoteAddr())) {
@@ -29,6 +29,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; @@ -29,6 +29,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
29 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 29 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
30 30
31 import java.util.Optional; 31 import java.util.Optional;
  32 +import java.util.UUID;
32 import java.util.function.Consumer; 33 import java.util.function.Consumer;
33 34
34 /** 35 /**
@@ -37,127 +38,136 @@ import java.util.function.Consumer; @@ -37,127 +38,136 @@ import java.util.function.Consumer;
37 @Slf4j 38 @Slf4j
38 public class HttpSessionCtx extends DeviceAwareSessionContext { 39 public class HttpSessionCtx extends DeviceAwareSessionContext {
39 40
40 - private final SessionId sessionId;  
41 - private final long timeout;  
42 - private final DeferredResult<ResponseEntity> responseWriter;  
43 -  
44 - public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {  
45 - super();  
46 - this.sessionId = new HttpSessionId();  
47 - this.responseWriter = responseWriter;  
48 - this.timeout = timeout;  
49 - }  
50 -  
51 - @Override  
52 - public SessionType getSessionType() {  
53 - return SessionType.SYNC; 41 + public HttpSessionCtx(UUID sessionId) {
  42 + super(sessionId);
54 } 43 }
55 44
56 @Override 45 @Override
57 - public void onMsg(SessionActorToAdaptorMsg source) throws SessionException {  
58 - ToDeviceMsg msg = source.getMsg();  
59 - switch (msg.getSessionMsgType()) {  
60 - case GET_ATTRIBUTES_RESPONSE:  
61 - reply((GetAttributesResponse) msg);  
62 - return;  
63 - case STATUS_CODE_RESPONSE:  
64 - reply((StatusCodeResponse) msg);  
65 - return;  
66 - case ATTRIBUTES_UPDATE_NOTIFICATION:  
67 - reply((AttributesUpdateNotification) msg);  
68 - return;  
69 - case TO_DEVICE_RPC_REQUEST:  
70 - reply((ToDeviceRpcRequestMsg) msg);  
71 - return;  
72 - case TO_SERVER_RPC_RESPONSE:  
73 - reply((ToServerRpcResponseMsg) msg);  
74 - return;  
75 - case RULE_ENGINE_ERROR:  
76 - reply((RuleEngineErrorMsg) msg);  
77 - return;  
78 - default:  
79 - break;  
80 - }  
81 - }  
82 -  
83 - private void reply(RuleEngineErrorMsg msg) {  
84 - HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;  
85 - switch (msg.getError()) {  
86 - case QUEUE_PUT_TIMEOUT:  
87 - status = HttpStatus.REQUEST_TIMEOUT;  
88 - break;  
89 - default:  
90 - if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) {  
91 - status = HttpStatus.BAD_REQUEST;  
92 - }  
93 - break;  
94 - }  
95 - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status));  
96 - }  
97 -  
98 - private <T> void reply(ResponseMsg<? extends T> msg, Consumer<T> f) {  
99 - Optional<Exception> msgError = msg.getError();  
100 - if (!msgError.isPresent()) {  
101 - Optional<? extends T> msgData = msg.getData();  
102 - if (msgData.isPresent()) {  
103 - f.accept(msgData.get());  
104 - }  
105 - } else {  
106 - Exception e = msgError.get();  
107 - responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR));  
108 - } 46 + public int nextMsgId() {
  47 + return 0;
109 } 48 }
110 49
111 - private void reply(ToDeviceRpcRequestMsg msg) {  
112 - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));  
113 - }  
114 -  
115 - private void reply(ToServerRpcResponseMsg msg) {  
116 -// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));  
117 - }  
118 -  
119 - private void reply(AttributesUpdateNotification msg) {  
120 - responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK));  
121 - }  
122 -  
123 - private void reply(GetAttributesResponse msg) {  
124 - reply(msg, payload -> {  
125 - if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {  
126 - responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));  
127 - } else {  
128 - JsonObject result = JsonConverter.toJson(payload, false);  
129 - responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK));  
130 - }  
131 - });  
132 - }  
133 -  
134 - private void reply(StatusCodeResponse msg) {  
135 - reply(msg, payload -> {  
136 - if (payload == 0) {  
137 - responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));  
138 - } else {  
139 - responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload)));  
140 - }  
141 - });  
142 - }  
143 -  
144 - @Override  
145 - public void onMsg(SessionCtrlMsg msg) throws SessionException {  
146 - //Do nothing  
147 - }  
148 -  
149 - @Override  
150 - public boolean isClosed() {  
151 - return false;  
152 - }  
153 -  
154 - @Override  
155 - public long getTimeout() {  
156 - return timeout;  
157 - }  
158 -  
159 - @Override  
160 - public SessionId getSessionId() {  
161 - return sessionId;  
162 - } 50 + // private final SessionId sessionId;
  51 +// private final long timeout;
  52 +// private final DeferredResult<ResponseEntity> responseWriter;
  53 +//
  54 +// public HttpSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, DeferredResult<ResponseEntity> responseWriter, long timeout) {
  55 +// super();
  56 +// this.sessionId = new HttpSessionId();
  57 +// this.responseWriter = responseWriter;
  58 +// this.timeout = timeout;
  59 +// }
  60 +//
  61 +// @Override
  62 +// public SessionType getSessionType() {
  63 +// return SessionType.SYNC;
  64 +// }
  65 +//
  66 +// @Override
  67 +// public void onMsg(SessionActorToAdaptorMsg source) throws SessionException {
  68 +// ToDeviceMsg msg = source.getMsg();
  69 +// switch (msg.getSessionMsgType()) {
  70 +// case GET_ATTRIBUTES_RESPONSE:
  71 +// reply((GetAttributesResponse) msg);
  72 +// return;
  73 +// case STATUS_CODE_RESPONSE:
  74 +// reply((StatusCodeResponse) msg);
  75 +// return;
  76 +// case ATTRIBUTES_UPDATE_NOTIFICATION:
  77 +// reply((AttributesUpdateNotification) msg);
  78 +// return;
  79 +// case TO_DEVICE_RPC_REQUEST:
  80 +// reply((ToDeviceRpcRequestMsg) msg);
  81 +// return;
  82 +// case TO_SERVER_RPC_RESPONSE:
  83 +// reply((ToServerRpcResponseMsg) msg);
  84 +// return;
  85 +// case RULE_ENGINE_ERROR:
  86 +// reply((RuleEngineErrorMsg) msg);
  87 +// return;
  88 +// default:
  89 +// break;
  90 +// }
  91 +// }
  92 +//
  93 +// private void reply(RuleEngineErrorMsg msg) {
  94 +// HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
  95 +// switch (msg.getError()) {
  96 +// case QUEUE_PUT_TIMEOUT:
  97 +// status = HttpStatus.REQUEST_TIMEOUT;
  98 +// break;
  99 +// default:
  100 +// if (msg.getInSessionMsgType() == SessionMsgType.TO_SERVER_RPC_REQUEST) {
  101 +// status = HttpStatus.BAD_REQUEST;
  102 +// }
  103 +// break;
  104 +// }
  105 +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toErrorJson(msg.getErrorMsg()).toString(), status));
  106 +// }
  107 +//
  108 +// private <T> void reply(ResponseMsg<? extends T> msg, Consumer<T> f) {
  109 +// Optional<Exception> msgError = msg.getError();
  110 +// if (!msgError.isPresent()) {
  111 +// Optional<? extends T> msgData = msg.getData();
  112 +// if (msgData.isPresent()) {
  113 +// f.accept(msgData.get());
  114 +// }
  115 +// } else {
  116 +// Exception e = msgError.get();
  117 +// responseWriter.setResult(new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR));
  118 +// }
  119 +// }
  120 +//
  121 +// private void reply(ToDeviceRpcRequestMsg msg) {
  122 +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
  123 +// }
  124 +//
  125 +// private void reply(ToServerRpcResponseMsg msg) {
  126 +//// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
  127 +// }
  128 +//
  129 +// private void reply(AttributesUpdateNotification msg) {
  130 +// responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg.getData(), false).toString(), HttpStatus.OK));
  131 +// }
  132 +//
  133 +// private void reply(GetAttributesResponse msg) {
  134 +// reply(msg, payload -> {
  135 +// if (payload.getClientAttributes().isEmpty() && payload.getSharedAttributes().isEmpty()) {
  136 +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
  137 +// } else {
  138 +// JsonObject result = JsonConverter.toJson(payload, false);
  139 +// responseWriter.setResult(new ResponseEntity<>(result.toString(), HttpStatus.OK));
  140 +// }
  141 +// });
  142 +// }
  143 +//
  144 +// private void reply(StatusCodeResponse msg) {
  145 +// reply(msg, payload -> {
  146 +// if (payload == 0) {
  147 +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
  148 +// } else {
  149 +// responseWriter.setResult(new ResponseEntity<>(HttpStatus.valueOf(payload)));
  150 +// }
  151 +// });
  152 +// }
  153 +//
  154 +// @Override
  155 +// public void onMsg(SessionCtrlMsg msg) throws SessionException {
  156 +// //Do nothing
  157 +// }
  158 +//
  159 +// @Override
  160 +// public boolean isClosed() {
  161 +// return false;
  162 +// }
  163 +//
  164 +// @Override
  165 +// public long getTimeout() {
  166 +// return timeout;
  167 +// }
  168 +//
  169 +// @Override
  170 +// public SessionId getSessionId() {
  171 +// return sessionId;
  172 +// }
163 } 173 }
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.transport.http.session;  
17 -  
18 -import java.util.UUID;  
19 -  
20 -/**  
21 - * @author Andrew Shvayka  
22 - */  
23 -public class HttpSessionId implements SessionId {  
24 -  
25 - private final UUID id;  
26 -  
27 - public HttpSessionId() {  
28 - this.id = UUID.randomUUID();  
29 - }  
30 -  
31 - @Override  
32 - public String toUidStr() {  
33 - return id.toString();  
34 - }  
35 -}  
@@ -54,7 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR @@ -54,7 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenR
54 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; 54 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
55 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 55 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
56 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 56 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
57 -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; 57 +import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
58 import org.thingsboard.server.transport.mqtt.util.SslUtil; 58 import org.thingsboard.server.transport.mqtt.util.SslUtil;
59 59
60 import javax.net.ssl.SSLPeerUnverifiedException; 60 import javax.net.ssl.SSLPeerUnverifiedException;
@@ -98,7 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -98,7 +98,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
98 private volatile SessionInfoProto sessionInfo; 98 private volatile SessionInfoProto sessionInfo;
99 private volatile InetSocketAddress address; 99 private volatile InetSocketAddress address;
100 private volatile DeviceSessionCtx deviceSessionCtx; 100 private volatile DeviceSessionCtx deviceSessionCtx;
101 - private volatile GatewaySessionCtx gatewaySessionCtx; 101 + private volatile GatewaySessionHandler gatewaySessionHandler;
102 102
103 MqttTransportHandler(MqttTransportContext context) { 103 MqttTransportHandler(MqttTransportContext context) {
104 this.sessionId = UUID.randomUUID(); 104 this.sessionId = UUID.randomUUID();
@@ -175,7 +175,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -175,7 +175,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
175 log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); 175 log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId);
176 176
177 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { 177 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
178 - if (gatewaySessionCtx != null) { 178 + if (gatewaySessionHandler != null) {
179 handleGatewayPublishMsg(topicName, msgId, mqttMsg); 179 handleGatewayPublishMsg(topicName, msgId, mqttMsg);
180 } 180 }
181 } else { 181 } else {
@@ -187,22 +187,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -187,22 +187,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
187 try { 187 try {
188 switch (topicName) { 188 switch (topicName) {
189 case MqttTopics.GATEWAY_TELEMETRY_TOPIC: 189 case MqttTopics.GATEWAY_TELEMETRY_TOPIC:
190 - gatewaySessionCtx.onDeviceTelemetry(mqttMsg); 190 + gatewaySessionHandler.onDeviceTelemetry(mqttMsg);
191 break; 191 break;
192 case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: 192 case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
193 - gatewaySessionCtx.onDeviceAttributes(mqttMsg); 193 + gatewaySessionHandler.onDeviceAttributes(mqttMsg);
194 break; 194 break;
195 case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC: 195 case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
196 - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); 196 + gatewaySessionHandler.onDeviceAttributesRequest(mqttMsg);
197 break; 197 break;
198 case MqttTopics.GATEWAY_RPC_TOPIC: 198 case MqttTopics.GATEWAY_RPC_TOPIC:
199 - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); 199 + gatewaySessionHandler.onDeviceRpcResponse(mqttMsg);
200 break; 200 break;
201 case MqttTopics.GATEWAY_CONNECT_TOPIC: 201 case MqttTopics.GATEWAY_CONNECT_TOPIC:
202 - gatewaySessionCtx.onDeviceConnect(mqttMsg); 202 + gatewaySessionHandler.onDeviceConnect(mqttMsg);
203 break; 203 break;
204 case MqttTopics.GATEWAY_DISCONNECT_TOPIC: 204 case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
205 - gatewaySessionCtx.onDeviceDisconnect(mqttMsg); 205 + gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
206 break; 206 break;
207 } 207 }
208 } catch (RuntimeException | AdaptorException e) { 208 } catch (RuntimeException | AdaptorException e) {
@@ -405,8 +405,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -405,8 +405,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
405 if (deviceSessionCtx.isConnected()) { 405 if (deviceSessionCtx.isConnected()) {
406 transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); 406 transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null);
407 transportService.deregisterSession(sessionInfo); 407 transportService.deregisterSession(sessionInfo);
408 - if (gatewaySessionCtx != null) {  
409 - gatewaySessionCtx.onGatewayDisconnect(); 408 + if (gatewaySessionHandler != null) {
  409 + gatewaySessionHandler.onGatewayDisconnect();
410 } 410 }
411 } 411 }
412 } 412 }
@@ -467,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -467,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
467 if (infoNode != null) { 467 if (infoNode != null) {
468 JsonNode gatewayNode = infoNode.get("gateway"); 468 JsonNode gatewayNode = infoNode.get("gateway");
469 if (gatewayNode != null && gatewayNode.asBoolean()) { 469 if (gatewayNode != null && gatewayNode.asBoolean()) {
470 - gatewaySessionCtx = new GatewaySessionCtx(context, deviceSessionCtx, sessionId); 470 + gatewaySessionHandler = new GatewaySessionHandler(context, deviceSessionCtx, sessionId);
471 } 471 }
472 } 472 }
473 } catch (IOException e) { 473 } catch (IOException e) {
@@ -30,10 +30,10 @@ import java.util.concurrent.ConcurrentMap; @@ -30,10 +30,10 @@ import java.util.concurrent.ConcurrentMap;
30 @Slf4j 30 @Slf4j
31 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { 31 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
32 32
33 - private final GatewaySessionCtx parent; 33 + private final GatewaySessionHandler parent;
34 private final SessionInfoProto sessionInfo; 34 private final SessionInfoProto sessionInfo;
35 35
36 - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) { 36 + public GatewayDeviceSessionCtx(GatewaySessionHandler parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
37 super(UUID.randomUUID(), mqttQoSMap); 37 super(UUID.randomUUID(), mqttQoSMap);
38 this.parent = parent; 38 this.parent = parent;
39 this.sessionInfo = SessionInfoProto.newBuilder() 39 this.sessionInfo = SessionInfoProto.newBuilder()
transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java renamed from transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java
@@ -52,13 +52,12 @@ import java.util.Set; @@ -52,13 +52,12 @@ import java.util.Set;
52 import java.util.UUID; 52 import java.util.UUID;
53 import java.util.concurrent.ConcurrentHashMap; 53 import java.util.concurrent.ConcurrentHashMap;
54 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
55 -import java.util.concurrent.atomic.AtomicInteger;  
56 55
57 /** 56 /**
58 * Created by ashvayka on 19.01.17. 57 * Created by ashvayka on 19.01.17.
59 */ 58 */
60 @Slf4j 59 @Slf4j
61 -public class GatewaySessionCtx { 60 +public class GatewaySessionHandler {
62 61
63 private static final String DEFAULT_DEVICE_TYPE = "default"; 62 private static final String DEFAULT_DEVICE_TYPE = "default";
64 private static final String CAN_T_PARSE_VALUE = "Can't parse value: "; 63 private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
@@ -73,7 +72,7 @@ public class GatewaySessionCtx { @@ -73,7 +72,7 @@ public class GatewaySessionCtx {
73 private final ChannelHandlerContext channel; 72 private final ChannelHandlerContext channel;
74 private final DeviceSessionCtx deviceSessionCtx; 73 private final DeviceSessionCtx deviceSessionCtx;
75 74
76 - public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) { 75 + public GatewaySessionHandler(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
77 this.context = context; 76 this.context = context;
78 this.transportService = context.getTransportService(); 77 this.transportService = context.getTransportService();
79 this.deviceSessionCtx = deviceSessionCtx; 78 this.deviceSessionCtx = deviceSessionCtx;
@@ -114,10 +113,12 @@ public class GatewaySessionCtx { @@ -114,10 +113,12 @@ public class GatewaySessionCtx {
114 new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() { 113 new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
115 @Override 114 @Override
116 public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) { 115 public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
117 - GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionCtx.this, msg.getDeviceInfo(), mqttQoSMap); 116 + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap);
118 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { 117 if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
119 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); 118 SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
120 transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); 119 transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  120 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
  121 + transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
121 transportService.registerSession(deviceSessionInfo, deviceSessionCtx); 122 transportService.registerSession(deviceSessionInfo, deviceSessionCtx);
122 } 123 }
123 future.set(devices.get(deviceName)); 124 future.set(devices.get(deviceName));
@@ -203,7 +204,7 @@ public class GatewaySessionCtx { @@ -203,7 +204,7 @@ public class GatewaySessionCtx {
203 204
204 @Override 205 @Override
205 public void onFailure(Throwable t) { 206 public void onFailure(Throwable t) {
206 - log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t); 207 + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
207 } 208 }
208 }, context.getExecutor()); 209 }, context.getExecutor());
209 } 210 }
@@ -264,8 +265,8 @@ public class GatewaySessionCtx { @@ -264,8 +265,8 @@ public class GatewaySessionCtx {
264 } else { 265 } else {
265 result.addAllSharedAttributeNames(keys); 266 result.addAllSharedAttributeNames(keys);
266 } 267 }
267 - int msgId = msg.variableHeader().packetId();  
268 TransportProtos.GetAttributeRequestMsg requestMsg = result.build(); 268 TransportProtos.GetAttributeRequestMsg requestMsg = result.build();
  269 + int msgId = msg.variableHeader().packetId();
269 Futures.addCallback(checkDeviceConnected(deviceName), 270 Futures.addCallback(checkDeviceConnected(deviceName),
270 new FutureCallback<GatewayDeviceSessionCtx>() { 271 new FutureCallback<GatewayDeviceSessionCtx>() {
271 @Override 272 @Override
@@ -275,7 +276,7 @@ public class GatewaySessionCtx { @@ -275,7 +276,7 @@ public class GatewaySessionCtx {
275 276
276 @Override 277 @Override
277 public void onFailure(Throwable t) { 278 public void onFailure(Throwable t) {
278 - log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t); 279 + log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t);
279 } 280 }
280 }, context.getExecutor()); 281 }, context.getExecutor());
281 ack(msg); 282 ack(msg);
@@ -35,7 +35,7 @@ @@ -35,7 +35,7 @@
35 </properties> 35 </properties>
36 36
37 <modules> 37 <modules>
38 - <!--<module>http</module>--> 38 + <module>http</module>
39 <!--<module>coap</module>--> 39 <!--<module>coap</module>-->
40 <module>mqtt-common</module> 40 <module>mqtt-common</module>
41 <module>mqtt-transport</module> 41 <module>mqtt-transport</module>