Commit e5a1fb58654cb5c6b03e8007eae34f5422625a7e

Authored by Andrew Shvayka
1 parent 950ec8d6

Refactoring of Gateway API

Showing 30 changed files with 635 additions and 559 deletions
@@ -411,29 +411,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso @@ -411,29 +411,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
411 } 411 }
412 } 412 }
413 413
414 -// private void processRpcResponses(ActorContext context, DeviceToDeviceActorMsg msg) {  
415 -// SessionId sessionId = msg.getSessionId();  
416 -// FromDeviceMsg inMsg = msg.getPayload();  
417 -// if (inMsg.getMsgType() == SessionMsgType.TO_DEVICE_RPC_RESPONSE) {  
418 -// logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);  
419 -// ToDeviceRpcResponseMsg responseMsg = (ToDeviceRpcResponseMsg) inMsg;  
420 -// ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());  
421 -// boolean success = requestMd != null;  
422 -// if (success) {  
423 -// systemContext.getDeviceRpcService().processRpcResponseFromDevice(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
424 -// requestMd.getMsg().getServerAddress(), responseMsg.getData(), null));  
425 -// } else {  
426 -// logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());  
427 -// }  
428 -// if (msg.getSessionType() == SessionType.SYNC) {  
429 -// BasicCommandAckResponse response = success  
430 -// ? BasicCommandAckResponse.onSuccess(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId())  
431 -// : BasicCommandAckResponse.onError(SessionMsgType.TO_DEVICE_RPC_REQUEST, responseMsg.getRequestId(), new TimeoutException());  
432 -// sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, msg.getSessionId()), msg.getServerAddress());  
433 -// }  
434 -// }  
435 -// }  
436 -  
437 void processClusterEventMsg(ClusterEventMsg msg) { 414 void processClusterEventMsg(ClusterEventMsg msg) {
438 // if (!msg.isAdded()) { 415 // if (!msg.isAdded()) {
439 // logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress()); 416 // logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress());
@@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
16 package org.thingsboard.server.actors.shared; 16 package org.thingsboard.server.actors.shared;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 -import org.thingsboard.server.common.data.id.SessionId;  
20 import org.thingsboard.server.common.msg.MsgType; 19 import org.thingsboard.server.common.msg.MsgType;
21 import org.thingsboard.server.common.msg.TbActorMsg; 20 import org.thingsboard.server.common.msg.TbActorMsg;
22 21
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,11 +24,16 @@ import org.springframework.beans.factory.annotation.Autowired; @@ -24,11 +24,16 @@ import org.springframework.beans.factory.annotation.Autowired;
24 import org.springframework.beans.factory.annotation.Value; 24 import org.springframework.beans.factory.annotation.Value;
25 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 25 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
26 import org.springframework.stereotype.Service; 26 import org.springframework.stereotype.Service;
  27 +import org.thingsboard.server.common.data.Device;
27 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.relation.EntityRelation;
28 import org.thingsboard.server.common.data.security.DeviceCredentials; 30 import org.thingsboard.server.common.data.security.DeviceCredentials;
29 import org.thingsboard.server.common.data.security.DeviceCredentialsType; 31 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
30 import org.thingsboard.server.dao.device.DeviceCredentialsService; 32 import org.thingsboard.server.dao.device.DeviceCredentialsService;
31 import org.thingsboard.server.dao.device.DeviceService; 33 import org.thingsboard.server.dao.device.DeviceService;
  34 +import org.thingsboard.server.dao.relation.RelationService;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
32 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 37 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
33 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; 38 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
34 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; 39 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@@ -40,8 +45,10 @@ import org.thingsboard.server.kafka.TBKafkaProducerTemplate; @@ -40,8 +45,10 @@ import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
40 import org.thingsboard.server.kafka.TbKafkaResponseTemplate; 45 import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
41 import org.thingsboard.server.kafka.TbKafkaSettings; 46 import org.thingsboard.server.kafka.TbKafkaSettings;
42 import org.thingsboard.server.service.cluster.discovery.DiscoveryService; 47 import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
  48 +import org.thingsboard.server.service.state.DeviceStateService;
43 49
44 import javax.annotation.PostConstruct; 50 import javax.annotation.PostConstruct;
  51 +import java.util.UUID;
45 import java.util.concurrent.ExecutorService; 52 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors; 53 import java.util.concurrent.Executors;
47 54
@@ -78,8 +85,14 @@ public class RemoteTransportApiService implements TransportApiService { @@ -78,8 +85,14 @@ public class RemoteTransportApiService implements TransportApiService {
78 private DeviceService deviceService; 85 private DeviceService deviceService;
79 86
80 @Autowired 87 @Autowired
  88 + private RelationService relationService;
  89 +
  90 + @Autowired
81 private DeviceCredentialsService deviceCredentialsService; 91 private DeviceCredentialsService deviceCredentialsService;
82 92
  93 + @Autowired
  94 + private DeviceStateService deviceStateService;
  95 +
83 private ExecutorService transportCallbackExecutor; 96 private ExecutorService transportCallbackExecutor;
84 97
85 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate; 98 private TbKafkaResponseTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
@@ -123,6 +136,8 @@ public class RemoteTransportApiService implements TransportApiService { @@ -123,6 +136,8 @@ public class RemoteTransportApiService implements TransportApiService {
123 } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) { 136 } else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
124 ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg(); 137 ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
125 return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE); 138 return validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
  139 + } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
  140 + return handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
126 } 141 }
127 return getEmptyTransportApiResponseFuture(); 142 return getEmptyTransportApiResponseFuture();
128 } 143 }
@@ -137,6 +152,32 @@ public class RemoteTransportApiService implements TransportApiService { @@ -137,6 +152,32 @@ public class RemoteTransportApiService implements TransportApiService {
137 } 152 }
138 } 153 }
139 154
  155 + private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
  156 + DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
  157 + ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(gatewayId);
  158 + return Futures.transform(gatewayFuture, gateway -> {
  159 + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), gateway.getName());
  160 + if (device == null) {
  161 + device = new Device();
  162 + device.setTenantId(gateway.getTenantId());
  163 + device.setName(requestMsg.getDeviceName());
  164 + device.setType(requestMsg.getDeviceType());
  165 + device.setCustomerId(gateway.getCustomerId());
  166 + device = deviceService.saveDevice(device);
  167 + relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
  168 + deviceStateService.onDeviceAdded(device);
  169 + }
  170 + try {
  171 + return TransportApiResponseMsg.newBuilder()
  172 + .setGetOrCreateDeviceResponseMsg(GetOrCreateDeviceFromGatewayResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
  173 + } catch (JsonProcessingException e) {
  174 + log.warn("[{}] Failed to lookup device by gateway id and name", gatewayId, requestMsg.getDeviceName(), e);
  175 + throw new RuntimeException(e);
  176 + }
  177 + }, transportCallbackExecutor);
  178 + }
  179 +
  180 +
140 private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) { 181 private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId) {
141 return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> { 182 return Futures.transform(deviceService.findDeviceByIdAsync(deviceId), device -> {
142 if (device == null) { 183 if (device == null) {
@@ -144,17 +185,8 @@ public class RemoteTransportApiService implements TransportApiService { @@ -144,17 +185,8 @@ public class RemoteTransportApiService implements TransportApiService {
144 return getEmptyTransportApiResponse(); 185 return getEmptyTransportApiResponse();
145 } 186 }
146 try { 187 try {
147 - DeviceInfoProto deviceInfoProto = DeviceInfoProto.newBuilder()  
148 - .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())  
149 - .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())  
150 - .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())  
151 - .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())  
152 - .setDeviceName(device.getName())  
153 - .setDeviceType(device.getType())  
154 - .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))  
155 - .build();  
156 return TransportApiResponseMsg.newBuilder() 188 return TransportApiResponseMsg.newBuilder()
157 - .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(deviceInfoProto).build()).build(); 189 + .setValidateTokenResponseMsg(ValidateDeviceCredentialsResponseMsg.newBuilder().setDeviceInfo(getDeviceInfoProto(device)).build()).build();
158 } catch (JsonProcessingException e) { 190 } catch (JsonProcessingException e) {
159 log.warn("[{}] Failed to lookup device by id", deviceId, e); 191 log.warn("[{}] Failed to lookup device by id", deviceId, e);
160 return getEmptyTransportApiResponse(); 192 return getEmptyTransportApiResponse();
@@ -162,6 +194,18 @@ public class RemoteTransportApiService implements TransportApiService { @@ -162,6 +194,18 @@ public class RemoteTransportApiService implements TransportApiService {
162 }); 194 });
163 } 195 }
164 196
  197 + private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
  198 + return DeviceInfoProto.newBuilder()
  199 + .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
  200 + .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
  201 + .setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
  202 + .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
  203 + .setDeviceName(device.getName())
  204 + .setDeviceType(device.getType())
  205 + .setAdditionalInfo(mapper.writeValueAsString(device.getAdditionalInfo()))
  206 + .build();
  207 + }
  208 +
165 private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() { 209 private ListenableFuture<TransportApiResponseMsg> getEmptyTransportApiResponseFuture() {
166 return Futures.immediateFuture(getEmptyTransportApiResponse()); 210 return Futures.immediateFuture(getEmptyTransportApiResponse());
167 } 211 }
1 -/**  
2 - * Copyright © 2016-2018 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.common.data.id;  
17 -  
18 -import java.io.Serializable;  
19 -  
20 -public interface SessionId extends Serializable {  
21 -  
22 - String toUidStr();  
23 -  
24 -}  
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.aware; 16 package org.thingsboard.server.common.msg.aware;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -  
20 public interface SessionAwareMsg { 18 public interface SessionAwareMsg {
21 19
22 SessionId getSessionId(); 20 SessionId getSessionId();
@@ -16,11 +16,8 @@ @@ -16,11 +16,8 @@
16 package org.thingsboard.server.common.msg.core; 16 package org.thingsboard.server.common.msg.core;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 -import org.thingsboard.server.common.data.id.SessionId;  
20 -import org.thingsboard.server.common.msg.aware.SessionAwareMsg;  
21 import org.thingsboard.server.common.msg.session.FromDeviceMsg; 19 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
22 import org.thingsboard.server.common.msg.session.SessionMsgType; 20 import org.thingsboard.server.common.msg.session.SessionMsgType;
23 -import org.thingsboard.server.common.msg.session.SessionMsgType;  
24 21
25 /** 22 /**
26 * @author Andrew Shvayka 23 * @author Andrew Shvayka
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.session; 16 package org.thingsboard.server.common.msg.session;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -  
20 public class BasicSessionMsg implements SessionMsg { 18 public class BasicSessionMsg implements SessionMsg {
21 19
22 private final SessionContext ctx; 20 private final SessionContext ctx;
@@ -18,7 +18,6 @@ package org.thingsboard.server.common.msg.session; @@ -18,7 +18,6 @@ package org.thingsboard.server.common.msg.session;
18 import org.thingsboard.server.common.data.Device; 18 import org.thingsboard.server.common.data.Device;
19 import org.thingsboard.server.common.data.id.CustomerId; 19 import org.thingsboard.server.common.data.id.CustomerId;
20 import org.thingsboard.server.common.data.id.DeviceId; 20 import org.thingsboard.server.common.data.id.DeviceId;
21 -import org.thingsboard.server.common.data.id.SessionId;  
22 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
23 import org.thingsboard.server.common.msg.MsgType; 22 import org.thingsboard.server.common.msg.MsgType;
24 23
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,21 +15,11 @@ @@ -15,21 +15,11 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.session; 16 package org.thingsboard.server.common.msg.session;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -import org.thingsboard.server.common.msg.session.ex.SessionException; 18 +import java.util.UUID;
20 19
21 public interface SessionContext { 20 public interface SessionContext {
22 21
23 - SessionId getSessionId();  
24 -  
25 - SessionType getSessionType();  
26 -  
27 - void onMsg(SessionActorToAdaptorMsg msg) throws SessionException;  
28 -  
29 - void onMsg(SessionCtrlMsg msg) throws SessionException;  
30 -  
31 - boolean isClosed();  
32 -  
33 - long getTimeout(); 22 + UUID getSessionId();
34 23
  24 + int nextMsgId();
35 } 25 }
@@ -15,7 +15,6 @@ @@ -15,7 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.common.msg.session.ctrl; 16 package org.thingsboard.server.common.msg.session.ctrl;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 import org.thingsboard.server.common.msg.MsgType; 18 import org.thingsboard.server.common.msg.MsgType;
20 import org.thingsboard.server.common.msg.session.SessionCtrlMsg; 19 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
21 20
@@ -15,19 +15,8 @@ @@ -15,19 +15,8 @@
15 */ 15 */
16 package org.thingsboard.server.common.transport; 16 package org.thingsboard.server.common.transport;
17 17
18 -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;  
19 -import org.thingsboard.server.common.msg.session.SessionMsgType;  
20 -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;  
21 import org.thingsboard.server.common.msg.session.SessionContext; 18 import org.thingsboard.server.common.msg.session.SessionContext;
22 -import org.thingsboard.server.common.transport.adaptor.AdaptorException;  
23 -import org.thingsboard.server.gen.transport.TransportProtos;  
24 -  
25 -import java.util.Optional;  
26 19
27 public interface TransportAdaptor<C extends SessionContext, T, V> { 20 public interface TransportAdaptor<C extends SessionContext, T, V> {
28 21
29 - AdaptorToSessionActorMsg convertToActorMsg(C ctx, SessionMsgType type, T inbound) throws AdaptorException;  
30 -  
31 - Optional<V> convertToAdaptorMsg(C ctx, SessionActorToAdaptorMsg msg) throws AdaptorException;  
32 -  
33 } 22 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.common.transport; 16 package org.thingsboard.server.common.transport;
17 17
  18 +import org.thingsboard.server.gen.transport.TransportProtos;
18 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; 19 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
19 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; 20 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
20 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
@@ -39,6 +40,9 @@ public interface TransportService { @@ -39,6 +40,9 @@ public interface TransportService {
39 void process(ValidateDeviceX509CertRequestMsg msg, 40 void process(ValidateDeviceX509CertRequestMsg msg,
40 TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback); 41 TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
41 42
  43 + void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
  44 + TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
  45 +
42 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback); 46 void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
43 47
44 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback); 48 void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,8 @@ import java.util.stream.Collectors; @@ -52,7 +52,8 @@ import java.util.stream.Collectors;
52 public class JsonConverter { 52 public class JsonConverter {
53 53
54 private static final Gson GSON = new Gson(); 54 private static final Gson GSON = new Gson();
55 - public static final String CAN_T_PARSE_VALUE = "Can't parse value: "; 55 + private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
  56 + private static final String DEVICE_PROPERTY = "device";
56 57
57 public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject) throws JsonSyntaxException { 58 public static TelemetryUploadRequest convertToTelemetry(JsonElement jsonObject) throws JsonSyntaxException {
58 return convertToTelemetry(jsonObject, BasicRequest.DEFAULT_REQUEST_ID); 59 return convertToTelemetry(jsonObject, BasicRequest.DEFAULT_REQUEST_ID);
@@ -318,6 +319,57 @@ public class JsonConverter { @@ -318,6 +319,57 @@ public class JsonConverter {
318 return result; 319 return result;
319 } 320 }
320 321
  322 + public static JsonObject getJsonObjectForGateway(TransportProtos.GetAttributeResponseMsg responseMsg) {
  323 + JsonObject result = new JsonObject();
  324 + result.addProperty("id", responseMsg.getRequestId());
  325 + if (responseMsg.getClientAttributeListCount() > 0) {
  326 + addValues(result, responseMsg.getClientAttributeListList());
  327 + }
  328 + if (responseMsg.getSharedAttributeListCount() > 0) {
  329 + addValues(result, responseMsg.getSharedAttributeListList());
  330 + }
  331 + return result;
  332 + }
  333 +
  334 + public static JsonObject getJsonObjectForGateway(String deviceName, AttributeUpdateNotificationMsg notificationMsg) {
  335 + JsonObject result = new JsonObject();
  336 + result.addProperty(DEVICE_PROPERTY, deviceName);
  337 + result.add("data", toJson(notificationMsg));
  338 + return result;
  339 + }
  340 +
  341 + private static void addValues(JsonObject result, List<TransportProtos.TsKvProto> kvList) {
  342 + if (kvList.size() == 1) {
  343 + addValueToJson(result, "value", kvList.get(0).getKv());
  344 + } else {
  345 + JsonObject values;
  346 + if (result.has("values")) {
  347 + values = result.get("values").getAsJsonObject();
  348 + } else {
  349 + values = new JsonObject();
  350 + result.add("values", values);
  351 + }
  352 + kvList.forEach(value -> addValueToJson(values, value.getKv().getKey(), value.getKv()));
  353 + }
  354 + }
  355 +
  356 + private static void addValueToJson(JsonObject json, String name, TransportProtos.KeyValueProto entry) {
  357 + switch (entry.getType()) {
  358 + case BOOLEAN_V:
  359 + json.addProperty(name, entry.getBoolV());
  360 + break;
  361 + case STRING_V:
  362 + json.addProperty(name, entry.getStringV());
  363 + break;
  364 + case DOUBLE_V:
  365 + json.addProperty(name, entry.getDoubleV());
  366 + break;
  367 + case LONG_V:
  368 + json.addProperty(name, entry.getLongV());
  369 + break;
  370 + }
  371 + }
  372 +
321 private static Consumer<AttributeKey> addToObject(JsonArray result) { 373 private static Consumer<AttributeKey> addToObject(JsonArray result) {
322 return key -> result.add(key.getAttributeKey()); 374 return key -> result.add(key.getAttributeKey());
323 } 375 }
@@ -31,16 +31,9 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @@ -31,16 +31,9 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
31 31
32 @Getter 32 @Getter
33 private volatile DeviceId deviceId; 33 private volatile DeviceId deviceId;
  34 + @Getter
34 private volatile DeviceInfoProto deviceInfo; 35 private volatile DeviceInfoProto deviceInfo;
35 36
36 - public long getDeviceIdMSB() {  
37 - return deviceInfo.getDeviceIdMSB();  
38 - }  
39 -  
40 - public long getDeviceIdLSB() {  
41 - return deviceInfo.getDeviceIdLSB();  
42 - }  
43 -  
44 public DeviceId getDeviceId() { 37 public DeviceId getDeviceId() {
45 return deviceId; 38 return deviceId;
46 } 39 }
@@ -125,6 +125,17 @@ message ValidateDeviceCredentialsResponseMsg { @@ -125,6 +125,17 @@ message ValidateDeviceCredentialsResponseMsg {
125 DeviceInfoProto deviceInfo = 1; 125 DeviceInfoProto deviceInfo = 1;
126 } 126 }
127 127
  128 +message GetOrCreateDeviceFromGatewayRequestMsg {
  129 + int64 gatewayIdMSB = 1;
  130 + int64 gatewayIdLSB = 2;
  131 + string deviceName = 3;
  132 + string deviceType = 4;
  133 +}
  134 +
  135 +message GetOrCreateDeviceFromGatewayResponseMsg {
  136 + DeviceInfoProto deviceInfo = 1;
  137 +}
  138 +
128 message SessionCloseNotificationProto { 139 message SessionCloseNotificationProto {
129 string message = 1; 140 string message = 1;
130 } 141 }
@@ -196,8 +207,10 @@ message ToTransportMsg { @@ -196,8 +207,10 @@ message ToTransportMsg {
196 message TransportApiRequestMsg { 207 message TransportApiRequestMsg {
197 ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1; 208 ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
198 ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2; 209 ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
  210 + GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3;
199 } 211 }
200 212
201 message TransportApiResponseMsg { 213 message TransportApiResponseMsg {
202 ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; 214 ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
  215 + GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
203 } 216 }
@@ -27,7 +27,6 @@ import org.eclipse.californium.core.network.Exchange; @@ -27,7 +27,6 @@ import org.eclipse.californium.core.network.Exchange;
27 import org.eclipse.californium.core.network.ExchangeObserver; 27 import org.eclipse.californium.core.network.ExchangeObserver;
28 import org.eclipse.californium.core.server.resources.CoapExchange; 28 import org.eclipse.californium.core.server.resources.CoapExchange;
29 import org.eclipse.californium.core.server.resources.Resource; 29 import org.eclipse.californium.core.server.resources.Resource;
30 -import org.thingsboard.server.common.data.id.SessionId;  
31 import org.thingsboard.server.common.data.security.DeviceCredentialsFilter; 30 import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
32 import org.thingsboard.server.common.data.security.DeviceTokenCredentials; 31 import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
33 import org.thingsboard.server.common.msg.session.*; 32 import org.thingsboard.server.common.msg.session.*;
@@ -20,20 +20,16 @@ import org.eclipse.californium.core.coap.CoAP.ResponseCode; @@ -20,20 +20,16 @@ import org.eclipse.californium.core.coap.CoAP.ResponseCode;
20 import org.eclipse.californium.core.coap.Request; 20 import org.eclipse.californium.core.coap.Request;
21 import org.eclipse.californium.core.coap.Response; 21 import org.eclipse.californium.core.coap.Response;
22 import org.eclipse.californium.core.server.resources.CoapExchange; 22 import org.eclipse.californium.core.server.resources.CoapExchange;
23 -import org.thingsboard.server.common.data.id.SessionId;  
24 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; 23 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
25 import org.thingsboard.server.common.msg.session.SessionCtrlMsg; 24 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
26 import org.thingsboard.server.common.msg.session.SessionType; 25 import org.thingsboard.server.common.msg.session.SessionType;
27 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; 26 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
28 -import org.thingsboard.server.common.msg.session.ex.SessionAuthException;  
29 import org.thingsboard.server.common.msg.session.ex.SessionException; 27 import org.thingsboard.server.common.msg.session.ex.SessionException;
30 import org.thingsboard.server.common.transport.SessionMsgProcessor; 28 import org.thingsboard.server.common.transport.SessionMsgProcessor;
31 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 29 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
32 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 30 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
33 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 31 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
34 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; 32 import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor;
35 -import org.slf4j.Logger;  
36 -import org.slf4j.LoggerFactory;  
37 33
38 import java.util.concurrent.atomic.AtomicInteger; 34 import java.util.concurrent.atomic.AtomicInteger;
39 35
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.transport.coap.session; 16 package org.thingsboard.server.transport.coap.session;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -  
20 public final class CoapSessionId implements SessionId { 18 public final class CoapSessionId implements SessionId {
21 19
22 private final String clientAddress; 20 private final String clientAddress;
@@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j; @@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
20 import org.springframework.http.HttpStatus; 20 import org.springframework.http.HttpStatus;
21 import org.springframework.http.ResponseEntity; 21 import org.springframework.http.ResponseEntity;
22 import org.springframework.web.context.request.async.DeferredResult; 22 import org.springframework.web.context.request.async.DeferredResult;
23 -import org.thingsboard.server.common.data.id.SessionId;  
24 import org.thingsboard.server.common.msg.core.*; 23 import org.thingsboard.server.common.msg.core.*;
25 import org.thingsboard.server.common.msg.session.*; 24 import org.thingsboard.server.common.msg.session.*;
26 import org.thingsboard.server.common.msg.session.ex.SessionException; 25 import org.thingsboard.server.common.msg.session.ex.SessionException;
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.transport.http.session; 16 package org.thingsboard.server.transport.http.session;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -  
20 import java.util.UUID; 18 import java.util.UUID;
21 19
22 /** 20 /**
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt; @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt;
18 import com.fasterxml.jackson.databind.ObjectMapper; 18 import com.fasterxml.jackson.databind.ObjectMapper;
19 import io.netty.handler.ssl.SslHandler; 19 import io.netty.handler.ssl.SslHandler;
20 import lombok.Data; 20 import lombok.Data;
  21 +import lombok.Getter;
21 import lombok.extern.slf4j.Slf4j; 22 import lombok.extern.slf4j.Slf4j;
22 import org.apache.commons.lang3.RandomStringUtils; 23 import org.apache.commons.lang3.RandomStringUtils;
23 import org.springframework.beans.factory.annotation.Autowired; 24 import org.springframework.beans.factory.annotation.Autowired;
@@ -30,8 +31,11 @@ import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaServi @@ -30,8 +31,11 @@ import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaServi
30 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 31 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
31 32
32 import javax.annotation.PostConstruct; 33 import javax.annotation.PostConstruct;
  34 +import javax.annotation.PreDestroy;
33 import java.net.InetAddress; 35 import java.net.InetAddress;
34 import java.net.UnknownHostException; 36 import java.net.UnknownHostException;
  37 +import java.util.concurrent.ExecutorService;
  38 +import java.util.concurrent.Executors;
35 39
36 /** 40 /**
37 * Created by ashvayka on 04.10.18. 41 * Created by ashvayka on 04.10.18.
@@ -64,6 +68,9 @@ public class MqttTransportContext { @@ -64,6 +68,9 @@ public class MqttTransportContext {
64 68
65 private SslHandler sslHandler; 69 private SslHandler sslHandler;
66 70
  71 + @Getter
  72 + private ExecutorService executor;
  73 +
67 @PostConstruct 74 @PostConstruct
68 public void init() { 75 public void init() {
69 if (StringUtils.isEmpty(nodeId)) { 76 if (StringUtils.isEmpty(nodeId)) {
@@ -74,6 +81,14 @@ public class MqttTransportContext { @@ -74,6 +81,14 @@ public class MqttTransportContext {
74 } 81 }
75 } 82 }
76 log.info("Current NodeId: {}", nodeId); 83 log.info("Current NodeId: {}", nodeId);
  84 + executor = Executors.newCachedThreadPool();
  85 + }
  86 +
  87 + @PreDestroy
  88 + public void stop() {
  89 + if (executor != null) {
  90 + executor.shutdownNow();
  91 + }
77 } 92 }
78 93
79 } 94 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -108,7 +108,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -108,7 +108,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
108 this.quotaService = context.getQuotaService(); 108 this.quotaService = context.getQuotaService();
109 this.sslHandler = context.getSslHandler(); 109 this.sslHandler = context.getSslHandler();
110 this.mqttQoSMap = new ConcurrentHashMap<>(); 110 this.mqttQoSMap = new ConcurrentHashMap<>();
111 - this.deviceSessionCtx = new DeviceSessionCtx(mqttQoSMap); 111 + this.deviceSessionCtx = new DeviceSessionCtx(sessionId, mqttQoSMap);
112 } 112 }
113 113
114 @Override 114 @Override
@@ -176,42 +176,40 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -176,42 +176,40 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
176 176
177 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { 177 if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
178 if (gatewaySessionCtx != null) { 178 if (gatewaySessionCtx != null) {
179 - gatewaySessionCtx.setChannel(ctx);  
180 -// handleMqttPublishMsg(topicName, msgId, mqttMsg); 179 + handleGatewayPublishMsg(topicName, msgId, mqttMsg);
181 } 180 }
182 } else { 181 } else {
183 processDevicePublish(ctx, mqttMsg, topicName, msgId); 182 processDevicePublish(ctx, mqttMsg, topicName, msgId);
184 } 183 }
185 } 184 }
186 185
187 - //  
188 -// private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {  
189 -// try {  
190 -// switch (topicName) {  
191 -// case GATEWAY_TELEMETRY_TOPIC:  
192 -// gatewaySessionCtx.onDeviceTelemetry(mqttMsg);  
193 -// break;  
194 -// case GATEWAY_ATTRIBUTES_TOPIC:  
195 -// gatewaySessionCtx.onDeviceAttributes(mqttMsg);  
196 -// break;  
197 -// case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:  
198 -// gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);  
199 -// break;  
200 -// case GATEWAY_RPC_TOPIC:  
201 -// gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);  
202 -// break;  
203 -// case GATEWAY_CONNECT_TOPIC:  
204 -// gatewaySessionCtx.onDeviceConnect(mqttMsg);  
205 -// break;  
206 -// case GATEWAY_DISCONNECT_TOPIC:  
207 -// gatewaySessionCtx.onDeviceDisconnect(mqttMsg);  
208 -// break;  
209 -// }  
210 -// } catch (RuntimeException | AdaptorException e) {  
211 -// log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);  
212 -// }  
213 -// }  
214 -// 186 + private void handleGatewayPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
  187 + try {
  188 + switch (topicName) {
  189 + case MqttTopics.GATEWAY_TELEMETRY_TOPIC:
  190 + gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
  191 + break;
  192 + case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
  193 + gatewaySessionCtx.onDeviceAttributes(mqttMsg);
  194 + break;
  195 + case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
  196 + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
  197 + break;
  198 + case MqttTopics.GATEWAY_RPC_TOPIC:
  199 + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
  200 + break;
  201 + case MqttTopics.GATEWAY_CONNECT_TOPIC:
  202 + gatewaySessionCtx.onDeviceConnect(mqttMsg);
  203 + break;
  204 + case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
  205 + gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
  206 + break;
  207 + }
  208 + } catch (RuntimeException | AdaptorException e) {
  209 + log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
  210 + }
  211 + }
  212 +
215 private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { 213 private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
216 try { 214 try {
217 if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { 215 if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
@@ -223,10 +221,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -223,10 +221,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
223 } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { 221 } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
224 TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg); 222 TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
225 transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); 223 transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
226 - } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)){ 224 + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
227 TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg); 225 TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
228 transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); 226 transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
229 - } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)){ 227 + } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
230 TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg); 228 TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
231 transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); 229 transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
232 } 230 }
@@ -469,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -469,7 +467,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
469 if (infoNode != null) { 467 if (infoNode != null) {
470 JsonNode gatewayNode = infoNode.get("gateway"); 468 JsonNode gatewayNode = infoNode.get("gateway");
471 if (gatewayNode != null && gatewayNode.asBoolean()) { 469 if (gatewayNode != null && gatewayNode.asBoolean()) {
472 - gatewaySessionCtx = new GatewaySessionCtx(deviceSessionCtx); 470 + gatewaySessionCtx = new GatewaySessionCtx(context, deviceSessionCtx, sessionId);
473 } 471 }
474 } 472 }
475 } catch (IOException e) { 473 } catch (IOException e) {
@@ -477,7 +475,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -477,7 +475,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
477 } 475 }
478 } 476 }
479 477
480 - private SessionEventMsg getSessionEventMsg(SessionEvent event) { 478 + public static SessionEventMsg getSessionEventMsg(SessionEvent event) {
481 return SessionEventMsg.newBuilder() 479 return SessionEventMsg.newBuilder()
482 .setSessionType(TransportProtos.SessionType.ASYNC) 480 .setSessionType(TransportProtos.SessionType.ASYNC)
483 .setEvent(event).build(); 481 .setEvent(event).build();
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors; @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
17 17
18 import com.google.gson.Gson; 18 import com.google.gson.Gson;
19 import com.google.gson.JsonElement; 19 import com.google.gson.JsonElement;
  20 +import com.google.gson.JsonObject;
20 import com.google.gson.JsonParser; 21 import com.google.gson.JsonParser;
21 import com.google.gson.JsonSyntaxException; 22 import com.google.gson.JsonSyntaxException;
22 import io.netty.buffer.ByteBuf; 23 import io.netty.buffer.ByteBuf;
@@ -26,7 +27,8 @@ import io.netty.handler.codec.mqtt.*; @@ -26,7 +27,8 @@ import io.netty.handler.codec.mqtt.*;
26 import lombok.extern.slf4j.Slf4j; 27 import lombok.extern.slf4j.Slf4j;
27 import org.springframework.stereotype.Component; 28 import org.springframework.stereotype.Component;
28 import org.springframework.util.StringUtils; 29 import org.springframework.util.StringUtils;
29 -import org.thingsboard.server.common.data.id.SessionId; 30 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  31 +import org.thingsboard.server.common.data.kv.KvEntry;
30 import org.thingsboard.server.common.msg.core.*; 32 import org.thingsboard.server.common.msg.core.*;
31 import org.thingsboard.server.common.msg.kv.AttributesKVMsg; 33 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
32 import org.thingsboard.server.common.msg.session.*; 34 import org.thingsboard.server.common.msg.session.*;
@@ -36,12 +38,15 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -36,12 +38,15 @@ import org.thingsboard.server.gen.transport.TransportProtos;
36 import org.thingsboard.server.transport.mqtt.MqttTopics; 38 import org.thingsboard.server.transport.mqtt.MqttTopics;
37 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 39 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
38 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 40 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
  41 +import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
39 42
40 import java.nio.charset.Charset; 43 import java.nio.charset.Charset;
41 import java.util.Arrays; 44 import java.util.Arrays;
42 import java.util.HashSet; 45 import java.util.HashSet;
  46 +import java.util.List;
43 import java.util.Optional; 47 import java.util.Optional;
44 import java.util.Set; 48 import java.util.Set;
  49 +import java.util.UUID;
45 50
46 /** 51 /**
47 * @author Andrew Shvayka 52 * @author Andrew Shvayka
@@ -55,7 +60,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -55,7 +60,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
55 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); 60 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
56 61
57 @Override 62 @Override
58 - public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 63 + public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
59 String payload = validatePayload(ctx.getSessionId(), inbound.payload()); 64 String payload = validatePayload(ctx.getSessionId(), inbound.payload());
60 try { 65 try {
61 return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)); 66 return JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload));
@@ -65,7 +70,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -65,7 +70,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
65 } 70 }
66 71
67 @Override 72 @Override
68 - public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 73 + public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
69 String payload = validatePayload(ctx.getSessionId(), inbound.payload()); 74 String payload = validatePayload(ctx.getSessionId(), inbound.payload());
70 try { 75 try {
71 return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)); 76 return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
@@ -75,7 +80,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -75,7 +80,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
75 } 80 }
76 81
77 @Override 82 @Override
78 - public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 83 + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
79 String topicName = inbound.variableHeader().topicName(); 84 String topicName = inbound.variableHeader().topicName();
80 try { 85 try {
81 TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder(); 86 TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
@@ -98,7 +103,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -98,7 +103,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
98 } 103 }
99 104
100 @Override 105 @Override
101 - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 106 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
102 String topicName = inbound.variableHeader().topicName(); 107 String topicName = inbound.variableHeader().topicName();
103 try { 108 try {
104 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length())); 109 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
@@ -111,7 +116,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -111,7 +116,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
111 } 116 }
112 117
113 @Override 118 @Override
114 - public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 119 + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
115 String topicName = inbound.variableHeader().topicName(); 120 String topicName = inbound.variableHeader().topicName();
116 String payload = validatePayload(ctx.getSessionId(), inbound.payload()); 121 String payload = validatePayload(ctx.getSessionId(), inbound.payload());
117 try { 122 try {
@@ -123,7 +128,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -123,7 +128,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
123 } 128 }
124 129
125 @Override 130 @Override
126 - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { 131 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
127 if (!StringUtils.isEmpty(responseMsg.getError())) { 132 if (!StringUtils.isEmpty(responseMsg.getError())) {
128 throw new AdaptorException(responseMsg.getError()); 133 throw new AdaptorException(responseMsg.getError());
129 } else { 134 } else {
@@ -138,99 +143,37 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -138,99 +143,37 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
138 } 143 }
139 144
140 @Override 145 @Override
141 - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {  
142 - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg))); 146 + public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  147 + if (!StringUtils.isEmpty(responseMsg.getError())) {
  148 + throw new AdaptorException(responseMsg.getError());
  149 + } else {
  150 + JsonObject result = JsonConverter.getJsonObjectForGateway(responseMsg);
  151 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, result));
  152 + }
143 } 153 }
144 154
145 @Override 155 @Override
146 - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {  
147 - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false))); 156 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
  157 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, JsonConverter.toJson(notificationMsg)));
148 } 158 }
149 159
150 @Override 160 @Override
151 - public Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {  
152 - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); 161 + public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
  162 + JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, notificationMsg);
  163 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, result));
153 } 164 }
154 165
155 @Override 166 @Override
156 - public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, SessionMsgType type, MqttMessage inbound) throws AdaptorException {  
157 - FromDeviceMsg msg;  
158 - switch (type) {  
159 - case POST_TELEMETRY_REQUEST:  
160 - msg = convertToTelemetryUploadRequest(ctx, (MqttPublishMessage) inbound);  
161 - break;  
162 - case POST_ATTRIBUTES_REQUEST:  
163 - msg = convertToUpdateAttributesRequest(ctx, (MqttPublishMessage) inbound);  
164 - break;  
165 - case SUBSCRIBE_ATTRIBUTES_REQUEST:  
166 - msg = new AttributesSubscribeMsg();  
167 - break;  
168 - case UNSUBSCRIBE_ATTRIBUTES_REQUEST:  
169 - msg = new AttributesUnsubscribeMsg();  
170 - break;  
171 - case SUBSCRIBE_RPC_COMMANDS_REQUEST:  
172 - msg = new RpcSubscribeMsg();  
173 - break;  
174 - case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:  
175 - msg = new RpcUnsubscribeMsg();  
176 - break;  
177 - case GET_ATTRIBUTES_REQUEST:  
178 - msg = convertToGetAttributesRequest(ctx, (MqttPublishMessage) inbound);  
179 - break;  
180 - case TO_DEVICE_RPC_RESPONSE:  
181 - msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound);  
182 - break;  
183 - case TO_SERVER_RPC_REQUEST:  
184 - msg = null;//convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound);  
185 - break;  
186 - default:  
187 - log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type);  
188 - throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + type + "!"));  
189 - }  
190 - return new BasicAdaptorToSessionActorMsg(ctx, msg); 167 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
  168 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false)));
191 } 169 }
192 170
193 @Override 171 @Override
194 - public Optional<MqttMessage> convertToAdaptorMsg(DeviceSessionCtx ctx, SessionActorToAdaptorMsg sessionMsg) throws AdaptorException {  
195 - MqttMessage result = null;  
196 - ToDeviceMsg msg = sessionMsg.getMsg();  
197 - switch (msg.getSessionMsgType()) {  
198 - case STATUS_CODE_RESPONSE:  
199 - case GET_ATTRIBUTES_RESPONSE:  
200 - ResponseMsg<?> responseMsg = (ResponseMsg) msg;  
201 - Optional<Exception> responseError = responseMsg.getError();  
202 - if (responseMsg.isSuccess()) {  
203 - result = convertResponseMsg(ctx, msg, responseMsg, responseError);  
204 - } else {  
205 - if (responseError.isPresent()) {  
206 - throw new AdaptorException(responseError.get());  
207 - }  
208 - }  
209 - break;  
210 - case ATTRIBUTES_UPDATE_NOTIFICATION:  
211 - AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;  
212 - result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, notification.getData(), false);  
213 - break;  
214 - case TO_DEVICE_RPC_REQUEST:  
215 - ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;  
216 - result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), rpcRequest);  
217 - break;  
218 - case TO_SERVER_RPC_RESPONSE:  
219 -// ToServerRpcResponseMsg rpcResponse = (ToServerRpcResponseMsg) msg;  
220 -// result = createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(),  
221 -// rpcResponse);  
222 - break;  
223 - case RULE_ENGINE_ERROR:  
224 - RuleEngineErrorMsg errorMsg = (RuleEngineErrorMsg) msg;  
225 - result = createMqttPublishMsg(ctx, "errors", JsonConverter.toErrorJson(errorMsg.getErrorMsg()));  
226 - break;  
227 - default:  
228 - break;  
229 - }  
230 - return Optional.ofNullable(result); 172 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  173 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
231 } 174 }
232 175
233 - private MqttMessage convertResponseMsg(DeviceSessionCtx ctx, ToDeviceMsg msg, 176 + private MqttMessage convertResponseMsg(MqttDeviceAwareSessionContext ctx, ToDeviceMsg msg,
234 ResponseMsg<?> responseMsg, Optional<Exception> responseError) throws AdaptorException { 177 ResponseMsg<?> responseMsg, Optional<Exception> responseError) throws AdaptorException {
235 MqttMessage result = null; 178 MqttMessage result = null;
236 SessionMsgType requestMsgType = responseMsg.getRequestMsgType(); 179 SessionMsgType requestMsgType = responseMsg.getRequestMsgType();
@@ -255,19 +198,19 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -255,19 +198,19 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
255 return result; 198 return result;
256 } 199 }
257 200
258 - private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) { 201 + private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, AttributesKVMsg msg, boolean asMap) {
259 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap)); 202 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap));
260 } 203 }
261 204
262 - private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) { 205 + private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, ToDeviceRpcRequestMsg msg) {
263 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false)); 206 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));
264 } 207 }
265 208
266 - private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) { 209 + private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {
267 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg)); 210 return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg));
268 } 211 }
269 212
270 - private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) { 213 + private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, JsonElement json) {
271 MqttFixedHeader mqttFixedHeader = 214 MqttFixedHeader mqttFixedHeader =
272 new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0); 215 new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
273 MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId()); 216 MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
@@ -276,7 +219,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -276,7 +219,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
276 return new MqttPublishMessage(mqttFixedHeader, header, payload); 219 return new MqttPublishMessage(mqttFixedHeader, header, payload);
277 } 220 }
278 221
279 - private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 222 + private FromDeviceMsg convertToGetAttributesRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
280 String topicName = inbound.variableHeader().topicName(); 223 String topicName = inbound.variableHeader().topicName();
281 try { 224 try {
282 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())); 225 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
@@ -295,7 +238,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -295,7 +238,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
295 } 238 }
296 } 239 }
297 240
298 - private FromDeviceMsg convertToRpcCommandResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { 241 + private FromDeviceMsg convertToRpcCommandResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
299 String topicName = inbound.variableHeader().topicName(); 242 String topicName = inbound.variableHeader().topicName();
300 try { 243 try {
301 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length())); 244 Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));
@@ -336,7 +279,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -336,7 +279,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
336 } 279 }
337 } 280 }
338 281
339 - public static JsonElement validateJsonPayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException { 282 + public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
340 String payload = validatePayload(sessionId, payloadData); 283 String payload = validatePayload(sessionId, payloadData);
341 try { 284 try {
342 return new JsonParser().parse(payload); 285 return new JsonParser().parse(payload);
@@ -345,11 +288,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -345,11 +288,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
345 } 288 }
346 } 289 }
347 290
348 - public static String validatePayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException { 291 + public static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
349 try { 292 try {
350 String payload = payloadData.toString(UTF8); 293 String payload = payloadData.toString(UTF8);
351 if (payload == null) { 294 if (payload == null) {
352 - log.warn("[{}] Payload is empty!", sessionId.toUidStr()); 295 + log.warn("[{}] Payload is empty!", sessionId);
353 throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); 296 throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
354 } 297 }
355 return payload; 298 return payload;
@@ -17,41 +17,44 @@ package org.thingsboard.server.transport.mqtt.adaptors; @@ -17,41 +17,44 @@ package org.thingsboard.server.transport.mqtt.adaptors;
17 17
18 import io.netty.handler.codec.mqtt.MqttMessage; 18 import io.netty.handler.codec.mqtt.MqttMessage;
19 import io.netty.handler.codec.mqtt.MqttPublishMessage; 19 import io.netty.handler.codec.mqtt.MqttPublishMessage;
20 -import org.thingsboard.server.common.transport.TransportAdaptor;  
21 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 20 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
22 -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;  
23 -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;  
24 -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;  
25 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; 23 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; 26 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
31 -import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; 27 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  28 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
  29 +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
  30 +import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
32 31
33 import java.util.Optional; 32 import java.util.Optional;
34 33
35 /** 34 /**
36 * @author Andrew Shvayka 35 * @author Andrew Shvayka
37 */ 36 */
38 -public interface MqttTransportAdaptor extends TransportAdaptor<DeviceSessionCtx, MqttMessage, MqttMessage> { 37 +public interface MqttTransportAdaptor {
  38 +
  39 + PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
  40 +
  41 + PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
39 42
40 - PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; 43 + GetAttributeRequestMsg convertToGetAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
41 44
42 - PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; 45 + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
43 46
44 - GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException; 47 + ToServerRpcRequestMsg convertToServerRpcRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage mqttMsg) throws AdaptorException;
45 48
46 - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException; 49 + Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
47 50
48 - ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg) throws AdaptorException; 51 + Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException;
49 52
50 - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg) throws AdaptorException; 53 + Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
51 54
52 - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException; 55 + Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
53 56
54 - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; 57 + Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
55 58
56 - Optional<MqttMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse); 59 + Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
57 } 60 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,19 +19,14 @@ import io.netty.channel.ChannelHandlerContext; @@ -19,19 +19,14 @@ import io.netty.channel.ChannelHandlerContext;
19 import io.netty.handler.codec.mqtt.*; 19 import io.netty.handler.codec.mqtt.*;
20 import lombok.Getter; 20 import lombok.Getter;
21 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
22 -import org.thingsboard.server.common.data.id.SessionId;  
23 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; 22 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
24 import org.thingsboard.server.common.msg.session.SessionCtrlMsg; 23 import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
25 import org.thingsboard.server.common.msg.session.SessionType; 24 import org.thingsboard.server.common.msg.session.SessionType;
26 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; 25 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
27 import org.thingsboard.server.common.msg.session.ex.SessionException; 26 import org.thingsboard.server.common.msg.session.ex.SessionException;
28 -import org.thingsboard.server.common.transport.SessionMsgProcessor;  
29 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 27 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
30 -import org.thingsboard.server.common.transport.auth.DeviceAuthService;  
31 -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;  
32 -import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;  
33 28
34 -import java.util.Map; 29 +import java.util.UUID;
35 import java.util.concurrent.ConcurrentMap; 30 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.atomic.AtomicInteger; 31 import java.util.concurrent.atomic.AtomicInteger;
37 32
@@ -41,14 +36,14 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -41,14 +36,14 @@ import java.util.concurrent.atomic.AtomicInteger;
41 @Slf4j 36 @Slf4j
42 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { 37 public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
43 38
44 - private final MqttSessionId sessionId; 39 + private final UUID sessionId;
45 @Getter 40 @Getter
46 private ChannelHandlerContext channel; 41 private ChannelHandlerContext channel;
47 private AtomicInteger msgIdSeq = new AtomicInteger(0); 42 private AtomicInteger msgIdSeq = new AtomicInteger(0);
48 43
49 - public DeviceSessionCtx(ConcurrentMap<String, Integer> mqttQoSMap) { 44 + public DeviceSessionCtx(UUID sessionId, ConcurrentMap<String, Integer> mqttQoSMap) {
50 super(null, null, mqttQoSMap); 45 super(null, null, mqttQoSMap);
51 - this.sessionId = new MqttSessionId(); 46 + this.sessionId = sessionId;
52 } 47 }
53 48
54 @Override 49 @Override
@@ -94,7 +89,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -94,7 +89,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
94 } 89 }
95 90
96 @Override 91 @Override
97 - public SessionId getSessionId() { 92 + public UUID getSessionId() {
98 return sessionId; 93 return sessionId;
99 } 94 }
100 95
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,70 +16,80 @@ @@ -16,70 +16,80 @@
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 import com.google.gson.Gson; 18 import com.google.gson.Gson;
19 -import com.google.gson.JsonArray;  
20 import com.google.gson.JsonElement; 19 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject; 20 import com.google.gson.JsonObject;
  21 +import com.sun.xml.internal.bind.v2.TODO;
22 import io.netty.buffer.ByteBuf; 22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator; 23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.buffer.UnpooledByteBufAllocator; 24 import io.netty.buffer.UnpooledByteBufAllocator;
25 -import io.netty.handler.codec.mqtt.*;  
26 -import org.thingsboard.server.common.data.Device;  
27 -import org.thingsboard.server.common.data.id.SessionId; 25 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  26 +import io.netty.handler.codec.mqtt.MqttMessage;
  27 +import io.netty.handler.codec.mqtt.MqttMessageType;
  28 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  29 +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
  30 +import lombok.extern.slf4j.Slf4j;
28 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 31 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
29 import org.thingsboard.server.common.data.kv.KvEntry; 32 import org.thingsboard.server.common.data.kv.KvEntry;
30 -import org.thingsboard.server.common.msg.core.*; 33 +import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
  34 +import org.thingsboard.server.common.msg.core.GetAttributesResponse;
  35 +import org.thingsboard.server.common.msg.core.ResponseMsg;
  36 +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
31 import org.thingsboard.server.common.msg.kv.AttributesKVMsg; 37 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
32 -import org.thingsboard.server.common.msg.session.*;  
33 -import org.thingsboard.server.common.msg.session.ex.SessionException; 38 +import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
  39 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  40 +import org.thingsboard.server.common.msg.session.ToDeviceMsg;
  41 +import org.thingsboard.server.common.transport.SessionMsgListener;
34 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 42 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
35 -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 43 +import org.thingsboard.server.gen.transport.TransportProtos;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
  45 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
36 import org.thingsboard.server.transport.mqtt.MqttTopics; 46 import org.thingsboard.server.transport.mqtt.MqttTopics;
37 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 47 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
38 48
39 import java.nio.charset.Charset; 49 import java.nio.charset.Charset;
40 import java.util.List; 50 import java.util.List;
41 -import java.util.Map;  
42 import java.util.Optional; 51 import java.util.Optional;
  52 +import java.util.UUID;
43 import java.util.concurrent.ConcurrentMap; 53 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.atomic.AtomicInteger; 54 import java.util.concurrent.atomic.AtomicInteger;
45 55
46 /** 56 /**
47 * Created by ashvayka on 19.01.17. 57 * Created by ashvayka on 19.01.17.
48 */ 58 */
49 -public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { 59 +@Slf4j
  60 +public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
50 61
51 private static final Gson GSON = new Gson(); 62 private static final Gson GSON = new Gson();
52 private static final Charset UTF8 = Charset.forName("UTF-8"); 63 private static final Charset UTF8 = Charset.forName("UTF-8");
53 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); 64 private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
54 - public static final String DEVICE_PROPERTY = "device";  
55 65
56 - private GatewaySessionCtx parent;  
57 - private final MqttSessionId sessionId; 66 + private final GatewaySessionCtx parent;
  67 + private final UUID sessionId;
  68 + private final SessionInfoProto sessionInfo;
58 private volatile boolean closed; 69 private volatile boolean closed;
59 private AtomicInteger msgIdSeq = new AtomicInteger(0); 70 private AtomicInteger msgIdSeq = new AtomicInteger(0);
60 71
61 - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {  
62 - super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap); 72 + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
  73 + super(mqttQoSMap);
63 this.parent = parent; 74 this.parent = parent;
64 - this.sessionId = new MqttSessionId(); 75 + this.sessionId = UUID.randomUUID();
  76 + this.sessionInfo = SessionInfoProto.newBuilder()
  77 + .setNodeId(parent.getNodeId())
  78 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  79 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  80 + .setDeviceIdMSB(deviceInfo.getDeviceIdMSB())
  81 + .setDeviceIdLSB(deviceInfo.getDeviceIdLSB())
  82 + .setTenantIdMSB(deviceInfo.getTenantIdMSB())
  83 + .setTenantIdLSB(deviceInfo.getTenantIdLSB())
  84 + .build();
  85 + setDeviceInfo(deviceInfo);
65 } 86 }
66 87
67 @Override 88 @Override
68 - public SessionId getSessionId() { 89 + public UUID getSessionId() {
69 return sessionId; 90 return sessionId;
70 } 91 }
71 92
72 - @Override  
73 - public SessionType getSessionType() {  
74 - return SessionType.ASYNC;  
75 - }  
76 -  
77 - @Override  
78 - public void onMsg(SessionActorToAdaptorMsg sessionMsg) throws SessionException {  
79 - Optional<MqttMessage> message = getToDeviceMsg(sessionMsg);  
80 - message.ifPresent(parent::writeAndFlush);  
81 - }  
82 -  
83 private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) { 93 private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {
84 ToDeviceMsg msg = sessionMsg.getMsg(); 94 ToDeviceMsg msg = sessionMsg.getMsg();
85 switch (msg.getSessionMsgType()) { 95 switch (msg.getSessionMsgType()) {
@@ -113,25 +123,6 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -113,25 +123,6 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
113 return Optional.empty(); 123 return Optional.empty();
114 } 124 }
115 125
116 - @Override  
117 - public void onMsg(SessionCtrlMsg msg) throws SessionException {  
118 - //Do nothing  
119 - }  
120 -  
121 - @Override  
122 - public boolean isClosed() {  
123 - return closed;  
124 - }  
125 -  
126 - public void setClosed(boolean closed) {  
127 - this.closed = closed;  
128 - }  
129 -  
130 - @Override  
131 - public long getTimeout() {  
132 - return 0;  
133 - }  
134 -  
135 private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { 126 private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) {
136 JsonObject result = new JsonObject(); 127 JsonObject result = new JsonObject();
137 result.addProperty("id", response.getRequestId()); 128 result.addProperty("id", response.getRequestId());
@@ -204,4 +195,40 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -204,4 +195,40 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
204 return new MqttPublishMessage(mqttFixedHeader, header, payload); 195 return new MqttPublishMessage(mqttFixedHeader, header, payload);
205 } 196 }
206 197
  198 + SessionInfoProto getSessionInfo() {
  199 + return sessionInfo;
  200 + }
  201 +
  202 + @Override
  203 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  204 + try {
  205 + parent.getAdaptor().convertToGatewayPublish(this, response).ifPresent(parent::writeAndFlush);
  206 + } catch (Exception e) {
  207 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  208 + }
  209 + }
  210 +
  211 + @Override
  212 + public void onAttributeUpdate(TransportProtos.AttributeUpdateNotificationMsg notification) {
  213 + try {
  214 + parent.getAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
  215 + } catch (Exception e) {
  216 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  217 + }
  218 + }
  219 +
  220 + @Override
  221 + public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  222 + parent.deregisterSession(getDeviceInfo().getDeviceName());
  223 + }
  224 +
  225 + @Override
  226 + public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) {
  227 +
  228 + }
  229 +
  230 + @Override
  231 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
  232 + TODO
  233 + }
207 } 234 }
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,11 @@ @@ -15,6 +15,11 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
  18 +
  19 +import com.google.common.util.concurrent.FutureCallback;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import com.google.common.util.concurrent.SettableFuture;
18 import com.google.gson.JsonArray; 23 import com.google.gson.JsonArray;
19 import com.google.gson.JsonElement; 24 import com.google.gson.JsonElement;
20 import com.google.gson.JsonNull; 25 import com.google.gson.JsonNull;
@@ -25,28 +30,28 @@ import io.netty.handler.codec.mqtt.MqttMessage; @@ -25,28 +30,28 @@ import io.netty.handler.codec.mqtt.MqttMessage;
25 import io.netty.handler.codec.mqtt.MqttPublishMessage; 30 import io.netty.handler.codec.mqtt.MqttPublishMessage;
26 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
27 import org.springframework.util.StringUtils; 32 import org.springframework.util.StringUtils;
28 -import org.thingsboard.server.common.data.Device;  
29 -import org.thingsboard.server.common.data.id.SessionId;  
30 -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;  
31 -import org.thingsboard.server.common.data.relation.EntityRelation;  
32 -import org.thingsboard.server.common.msg.core.*;  
33 -import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;  
34 -import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;  
35 -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;  
36 -import org.thingsboard.server.common.transport.SessionMsgProcessor; 33 +import org.thingsboard.server.common.transport.TransportService;
  34 +import org.thingsboard.server.common.transport.TransportServiceCallback;
37 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 35 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
38 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 36 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
39 -import org.thingsboard.server.common.transport.auth.DeviceAuthService;  
40 -import org.thingsboard.server.dao.device.DeviceService;  
41 -import org.thingsboard.server.dao.relation.RelationService; 37 +import org.thingsboard.server.gen.transport.TransportProtos;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  40 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  42 +import org.thingsboard.server.transport.mqtt.MqttTransportContext;
42 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 43 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
43 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; 44 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
  45 +import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
44 46
45 -import java.util.*; 47 +import javax.annotation.Nullable;
  48 +import java.util.Collections;
  49 +import java.util.HashSet;
  50 +import java.util.Map;
  51 +import java.util.Set;
  52 +import java.util.UUID;
  53 +import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
47 -import java.util.stream.Collectors;  
48 -  
49 -import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;  
50 55
51 /** 56 /**
52 * Created by ashvayka on 19.01.17. 57 * Created by ashvayka on 19.01.17.
@@ -55,184 +60,235 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val @@ -55,184 +60,235 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val
55 public class GatewaySessionCtx { 60 public class GatewaySessionCtx {
56 61
57 private static final String DEFAULT_DEVICE_TYPE = "default"; 62 private static final String DEFAULT_DEVICE_TYPE = "default";
58 - public static final String CAN_T_PARSE_VALUE = "Can't parse value: ";  
59 - public static final String DEVICE_PROPERTY = "device";  
60 -// private final Device gateway;  
61 -// private final SessionId gatewaySessionId;  
62 -// private final SessionMsgProcessor processor;  
63 -// private final DeviceService deviceService;  
64 -// private final DeviceAuthService authService;  
65 -// private final RelationService relationService;  
66 -// private final Map<String, GatewayDeviceSessionCtx> devices;  
67 -// private final ConcurrentMap<String, Integer> mqttQoSMap;  
68 - private ChannelHandlerContext channel;  
69 -  
70 -// public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {  
71 -// this.processor = processor;  
72 -// this.deviceService = deviceService;  
73 -// this.authService = authService;  
74 -// this.relationService = relationService;  
75 -// this.gateway = gatewaySessionCtx.getDevice();  
76 -// this.gatewaySessionId = gatewaySessionCtx.getSessionId();  
77 -// this.devices = new HashMap<>();  
78 -// this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();  
79 -// }  
80 -  
81 - public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) { 63 + private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
  64 + private static final String DEVICE_PROPERTY = "device";
  65 +
  66 + private final MqttTransportContext context;
  67 + private final TransportService transportService;
  68 + private final DeviceInfoProto gateway;
  69 + private final UUID sessionId;
  70 + private final Map<String, GatewayDeviceSessionCtx> devices;
  71 + private final ConcurrentMap<String, Integer> mqttQoSMap;
  72 + private final ChannelHandlerContext channel;
82 73
  74 + public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
  75 + this.context = context;
  76 + this.transportService = context.getTransportService();
  77 + this.gateway = deviceSessionCtx.getDeviceInfo();
  78 + this.sessionId = sessionId;
  79 + this.devices = new ConcurrentHashMap<>();
  80 + this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
  81 + this.channel = deviceSessionCtx.getChannel();
83 } 82 }
84 83
85 public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { 84 public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
86 JsonElement json = getJson(msg); 85 JsonElement json = getJson(msg);
87 String deviceName = checkDeviceName(getDeviceName(json)); 86 String deviceName = checkDeviceName(getDeviceName(json));
88 String deviceType = getDeviceType(json); 87 String deviceType = getDeviceType(json);
89 - onDeviceConnect(deviceName, deviceType);  
90 - ack(msg); 88 + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
  89 + @Override
  90 + public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
  91 + ack(msg);
  92 + }
  93 +
  94 + @Override
  95 + public void onFailure(Throwable t) {
  96 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
  97 +
  98 + }
  99 + }, context.getExecutor());
91 } 100 }
92 101
93 - private void onDeviceConnect(String deviceName, String deviceType) {  
94 -// if (!devices.containsKey(deviceName)) {  
95 -// Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);  
96 -// if (device == null) {  
97 -// device = new Device();  
98 -// device.setTenantId(gateway.getTenantId());  
99 -// device.setName(deviceName);  
100 -// device.setType(deviceType);  
101 -// device.setCustomerId(gateway.getCustomerId());  
102 -// device = deviceService.saveDevice(device);  
103 -// relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));  
104 -// processor.onDeviceAdded(device);  
105 -// }  
106 -// GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);  
107 -// devices.put(deviceName, ctx);  
108 -// log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);  
109 -// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));  
110 -// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));  
111 -// } 102 + private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
  103 + SettableFuture<GatewayDeviceSessionCtx> future = SettableFuture.create();
  104 + GatewayDeviceSessionCtx result = devices.get(deviceName);
  105 + if (result == null) {
  106 + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
  107 + .setDeviceName(deviceName)
  108 + .setDeviceType(deviceType)
  109 + .setGatewayIdMSB(gateway.getDeviceIdMSB())
  110 + .setGatewayIdLSB(gateway.getDeviceIdLSB()).build(),
  111 + new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg>() {
  112 + @Override
  113 + public void onSuccess(GetOrCreateDeviceFromGatewayResponseMsg msg) {
  114 + GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionCtx.this, msg.getDeviceInfo(), mqttQoSMap);
  115 + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  116 + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
  117 + transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
  118 + transportService.registerSession(deviceSessionInfo, deviceSessionCtx);
  119 + }
  120 + future.set(devices.get(deviceName));
  121 + }
  122 +
  123 + @Override
  124 + public void onError(Throwable e) {
  125 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
  126 + future.setException(e);
  127 + }
  128 + });
  129 + } else {
  130 + future.set(result);
  131 + }
  132 + return future;
112 } 133 }
113 134
114 public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { 135 public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
115 -// String deviceName = checkDeviceName(getDeviceName(getJson(msg)));  
116 -// GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);  
117 -// if (deviceSessionCtx != null) {  
118 -// processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));  
119 -// deviceSessionCtx.setClosed(true);  
120 -// log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);  
121 -// } else {  
122 -// log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);  
123 -// }  
124 -// ack(msg); 136 + String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
  137 + deregisterSession(deviceName);
  138 + ack(msg);
  139 + }
  140 +
  141 + void deregisterSession(String deviceName) {
  142 + GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
  143 + if (deviceSessionCtx != null) {
  144 + deregisterSession(deviceName, deviceSessionCtx);
  145 + } else {
  146 + log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
  147 + }
125 } 148 }
126 149
127 public void onGatewayDisconnect() { 150 public void onGatewayDisconnect() {
128 -// devices.forEach((k, v) -> {  
129 -// processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));  
130 -// }); 151 + devices.forEach(this::deregisterSession);
131 } 152 }
132 153
133 public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { 154 public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
134 -// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());  
135 -// int requestId = mqttMsg.variableHeader().messageId();  
136 -// if (json.isJsonObject()) {  
137 -// JsonObject jsonObj = json.getAsJsonObject();  
138 -// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {  
139 -// String deviceName = checkDeviceConnected(deviceEntry.getKey());  
140 -// if (!deviceEntry.getValue().isJsonArray()) {  
141 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
142 -// }  
143 -// BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);  
144 -// JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();  
145 -// for (JsonElement element : deviceData) {  
146 -// JsonConverter.parseWithTs(request, element.getAsJsonObject());  
147 -// }  
148 -// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);  
149 -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),  
150 -// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));  
151 -// }  
152 -// } else {  
153 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
154 -// }  
155 - } 155 + JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  156 + int msgId = mqttMsg.variableHeader().packetId();
  157 + if (json.isJsonObject()) {
  158 + JsonObject jsonObj = json.getAsJsonObject();
  159 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  160 + String deviceName = deviceEntry.getKey();
  161 + Futures.addCallback(checkDeviceConnected(deviceName),
  162 + new FutureCallback<GatewayDeviceSessionCtx>() {
  163 + @Override
  164 + public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
  165 + if (!deviceEntry.getValue().isJsonArray()) {
  166 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  167 + }
  168 + TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray());
  169 + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
  170 + }
156 171
157 - public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {  
158 -// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());  
159 -// if (json.isJsonObject()) {  
160 -// JsonObject jsonObj = json.getAsJsonObject();  
161 -// String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString());  
162 -// Integer requestId = jsonObj.get("id").getAsInt();  
163 -// String data = jsonObj.get("data").toString();  
164 -// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);  
165 -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),  
166 -// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));  
167 -// ack(mqttMsg);  
168 -// } else {  
169 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
170 -// } 172 + @Override
  173 + public void onFailure(Throwable t) {
  174 + log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t);
  175 + }
  176 + }, context.getExecutor());
  177 + }
  178 + } else {
  179 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  180 + }
171 } 181 }
172 182
173 public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { 183 public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
174 -// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());  
175 -// int requestId = mqttMsg.variableHeader().messageId();  
176 -// if (json.isJsonObject()) {  
177 -// JsonObject jsonObj = json.getAsJsonObject();  
178 -// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {  
179 -// String deviceName = checkDeviceConnected(deviceEntry.getKey());  
180 -// if (!deviceEntry.getValue().isJsonObject()) {  
181 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
182 -// }  
183 -// long ts = System.currentTimeMillis();  
184 -// BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);  
185 -// JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();  
186 -// request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));  
187 -// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);  
188 -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),  
189 -// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));  
190 -// }  
191 -// } else {  
192 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
193 -// } 184 + JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  185 + int msgId = mqttMsg.variableHeader().packetId();
  186 + if (json.isJsonObject()) {
  187 + JsonObject jsonObj = json.getAsJsonObject();
  188 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  189 + String deviceName = deviceEntry.getKey();
  190 + Futures.addCallback(checkDeviceConnected(deviceName),
  191 + new FutureCallback<GatewayDeviceSessionCtx>() {
  192 + @Override
  193 + public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
  194 + if (!deviceEntry.getValue().isJsonObject()) {
  195 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  196 + }
  197 + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject());
  198 + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
  199 + }
  200 +
  201 + @Override
  202 + public void onFailure(Throwable t) {
  203 + log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t);
  204 + }
  205 + }, context.getExecutor());
  206 + }
  207 + } else {
  208 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  209 + }
  210 + }
  211 +
  212 + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
  213 + JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  214 + int msgId = mqttMsg.variableHeader().packetId();
  215 + if (json.isJsonObject()) {
  216 + JsonObject jsonObj = json.getAsJsonObject();
  217 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  218 + Futures.addCallback(checkDeviceConnected(deviceName),
  219 + new FutureCallback<GatewayDeviceSessionCtx>() {
  220 + @Override
  221 + public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
  222 + Integer requestId = jsonObj.get("id").getAsInt();
  223 + String data = jsonObj.get("data").toString();
  224 + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  225 + .setRequestId(requestId).setPayload(data).build();
  226 + transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg));
  227 + }
  228 +
  229 + @Override
  230 + public void onFailure(Throwable t) {
  231 + log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t);
  232 + }
  233 + }, context.getExecutor());
  234 + } else {
  235 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  236 + }
194 } 237 }
195 238
196 public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException { 239 public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException {
197 -// JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload());  
198 -// if (json.isJsonObject()) {  
199 -// JsonObject jsonObj = json.getAsJsonObject();  
200 -// int requestId = jsonObj.get("id").getAsInt();  
201 -// String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();  
202 -// boolean clientScope = jsonObj.get("client").getAsBoolean();  
203 -// Set<String> keys;  
204 -// if (jsonObj.has("key")) {  
205 -// keys = Collections.singleton(jsonObj.get("key").getAsString());  
206 -// } else {  
207 -// JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();  
208 -// keys = new HashSet<>();  
209 -// for (JsonElement keyObj : keysArray) {  
210 -// keys.add(keyObj.getAsString());  
211 -// }  
212 -// }  
213 -//  
214 -// BasicGetAttributesRequest request;  
215 -// if (clientScope) {  
216 -// request = new BasicGetAttributesRequest(requestId, keys, null);  
217 -// } else {  
218 -// request = new BasicGetAttributesRequest(requestId, null, keys);  
219 -// }  
220 -// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);  
221 -// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),  
222 -// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));  
223 -// ack(msg);  
224 -// } else {  
225 -// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);  
226 -// } 240 + JsonElement json = JsonMqttAdaptor.validateJsonPayload(sessionId, msg.payload());
  241 + if (json.isJsonObject()) {
  242 + JsonObject jsonObj = json.getAsJsonObject();
  243 + int requestId = jsonObj.get("id").getAsInt();
  244 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  245 + boolean clientScope = jsonObj.get("client").getAsBoolean();
  246 + Set<String> keys;
  247 + if (jsonObj.has("key")) {
  248 + keys = Collections.singleton(jsonObj.get("key").getAsString());
  249 + } else {
  250 + JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
  251 + keys = new HashSet<>();
  252 + for (JsonElement keyObj : keysArray) {
  253 + keys.add(keyObj.getAsString());
  254 + }
  255 + }
  256 + TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
  257 + result.setRequestId(requestId);
  258 +
  259 + if (clientScope) {
  260 + result.addAllClientAttributeNames(keys);
  261 + } else {
  262 + result.addAllSharedAttributeNames(keys);
  263 + }
  264 + int msgId = msg.variableHeader().packetId();
  265 + TransportProtos.GetAttributeRequestMsg requestMsg = result.build();
  266 + Futures.addCallback(checkDeviceConnected(deviceName),
  267 + new FutureCallback<GatewayDeviceSessionCtx>() {
  268 + @Override
  269 + public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
  270 + transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg));
  271 + }
  272 +
  273 + @Override
  274 + public void onFailure(Throwable t) {
  275 + log.debug("[{}] Failed to process device teleemtry command: {}", sessionId, deviceName, t);
  276 + }
  277 + }, context.getExecutor());
  278 + ack(msg);
  279 + } else {
  280 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  281 + }
227 } 282 }
228 283
229 - private String checkDeviceConnected(String deviceName) {  
230 -// if (!devices.containsKey(deviceName)) {  
231 -// log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName);  
232 -// onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);  
233 -// }  
234 -// return deviceName;  
235 - return null; 284 + private ListenableFuture<GatewayDeviceSessionCtx> checkDeviceConnected(String deviceName) {
  285 + GatewayDeviceSessionCtx ctx = devices.get(deviceName);
  286 + if (ctx == null) {
  287 + log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName);
  288 + return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
  289 + } else {
  290 + return Futures.immediateFuture(ctx);
  291 + }
236 } 292 }
237 293
238 private String checkDeviceName(String deviceName) { 294 private String checkDeviceName(String deviceName) {
@@ -253,32 +309,52 @@ public class GatewaySessionCtx { @@ -253,32 +309,52 @@ public class GatewaySessionCtx {
253 } 309 }
254 310
255 private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException { 311 private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
256 -// return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());  
257 - return null; 312 + return JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  313 + }
  314 +
  315 + private void ack(MqttPublishMessage msg) {
  316 + if (msg.variableHeader().packetId() > 0) {
  317 + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().packetId()));
  318 + }
258 } 319 }
259 320
260 - protected SessionMsgProcessor getProcessor() {  
261 -// return processor;  
262 - return null; 321 + void writeAndFlush(MqttMessage mqttMessage) {
  322 + channel.writeAndFlush(mqttMessage);
263 } 323 }
264 324
265 - DeviceAuthService getAuthService() {  
266 -// return authService;  
267 - return null; 325 + public String getNodeId() {
  326 + return context.getNodeId();
268 } 327 }
269 328
270 - public void setChannel(ChannelHandlerContext channel) {  
271 - this.channel = channel; 329 + private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
  330 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  331 + transportService.process(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
  332 + log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
272 } 333 }
273 334
274 - private void ack(MqttPublishMessage msg) {  
275 - if (msg.variableHeader().messageId() > 0) {  
276 - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));  
277 - } 335 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) {
  336 + return new TransportServiceCallback<Void>() {
  337 + @Override
  338 + public void onSuccess(Void dummy) {
  339 + log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg);
  340 + if (msgId > 0) {
  341 + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
  342 + }
  343 + }
  344 +
  345 + @Override
  346 + public void onError(Throwable e) {
  347 + log.trace("[{}] Failed to publish msg: {}", sessionId, deviceName, msg, e);
  348 + ctx.close();
  349 + }
  350 + };
278 } 351 }
279 352
280 - void writeAndFlush(MqttMessage mqttMessage) {  
281 - channel.writeAndFlush(mqttMessage); 353 + public MqttTransportContext getContext() {
  354 + return context;
282 } 355 }
283 356
  357 + public MqttTransportAdaptor getAdaptor() {
  358 + return context.getAdaptor();
  359 + }
284 } 360 }
@@ -31,12 +31,7 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo @@ -31,12 +31,7 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo
31 31
32 private final ConcurrentMap<String, Integer> mqttQoSMap; 32 private final ConcurrentMap<String, Integer> mqttQoSMap;
33 33
34 - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {  
35 - super();  
36 - this.mqttQoSMap = mqttQoSMap;  
37 - }  
38 -  
39 - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) { 34 + public MqttDeviceAwareSessionContext(ConcurrentMap<String, Integer> mqttQoSMap) {
40 super(); 35 super();
41 this.mqttQoSMap = mqttQoSMap; 36 this.mqttQoSMap = mqttQoSMap;
42 } 37 }
@@ -15,9 +15,6 @@ @@ -15,9 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 -import org.thingsboard.server.common.data.id.SessionId;  
19 -  
20 -import java.util.UUID;  
21 import java.util.concurrent.atomic.AtomicLong; 18 import java.util.concurrent.atomic.AtomicLong;
22 19
23 /** 20 /**
1 /** 1 /**
2 * Copyright © 2016-2018 The Thingsboard Authors 2 * Copyright © 2016-2018 The Thingsboard Authors
3 - * 3 + * <p>
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - * 7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,9 @@ import org.springframework.stereotype.Service; @@ -25,7 +25,9 @@ import org.springframework.stereotype.Service;
25 import org.thingsboard.server.common.transport.SessionMsgListener; 25 import org.thingsboard.server.common.transport.SessionMsgListener;
26 import org.thingsboard.server.common.transport.TransportService; 26 import org.thingsboard.server.common.transport.TransportService;
27 import org.thingsboard.server.common.transport.TransportServiceCallback; 27 import org.thingsboard.server.common.transport.TransportServiceCallback;
28 -import org.thingsboard.server.gen.transport.TransportProtos; 28 +import org.thingsboard.server.gen.transport.TransportProtos.*;
  29 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; 31 import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; 32 import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
31 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; 33 import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
@@ -153,7 +155,7 @@ public class MqttTransportService implements TransportService { @@ -153,7 +155,7 @@ public class MqttTransportService implements TransportService {
153 try { 155 try {
154 ToTransportMsg toTransportMsg = mainConsumer.decode(record); 156 ToTransportMsg toTransportMsg = mainConsumer.decode(record);
155 if (toTransportMsg.hasToDeviceSessionMsg()) { 157 if (toTransportMsg.hasToDeviceSessionMsg()) {
156 - TransportProtos.DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg(); 158 + DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
157 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); 159 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
158 SessionMsgListener listener = sessions.get(sessionId); 160 SessionMsgListener listener = sessions.get(sessionId);
159 if (listener != null) { 161 if (listener != null) {
@@ -228,9 +230,16 @@ public class MqttTransportService implements TransportService { @@ -228,9 +230,16 @@ public class MqttTransportService implements TransportService {
228 } 230 }
229 231
230 @Override 232 @Override
  233 + public void process(GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback) {
  234 + AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getTenantIdMSB() + msg.getTenantIdLSB() + msg.getDeviceName(),
  235 + TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()),
  236 + response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, transportCallbackExecutor);
  237 + }
  238 +
  239 + @Override
231 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) { 240 public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
232 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 241 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
233 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 242 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
234 .setSessionEvent(msg).build() 243 .setSessionEvent(msg).build()
235 ).build(); 244 ).build();
236 send(sessionInfo, toRuleEngineMsg, callback); 245 send(sessionInfo, toRuleEngineMsg, callback);
@@ -239,7 +248,7 @@ public class MqttTransportService implements TransportService { @@ -239,7 +248,7 @@ public class MqttTransportService implements TransportService {
239 @Override 248 @Override
240 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { 249 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
241 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 250 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
242 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 251 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
243 .setPostTelemetry(msg).build() 252 .setPostTelemetry(msg).build()
244 ).build(); 253 ).build();
245 send(sessionInfo, toRuleEngineMsg, callback); 254 send(sessionInfo, toRuleEngineMsg, callback);
@@ -248,52 +257,52 @@ public class MqttTransportService implements TransportService { @@ -248,52 +257,52 @@ public class MqttTransportService implements TransportService {
248 @Override 257 @Override
249 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) { 258 public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
250 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 259 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
251 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 260 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
252 .setPostAttributes(msg).build() 261 .setPostAttributes(msg).build()
253 ).build(); 262 ).build();
254 send(sessionInfo, toRuleEngineMsg, callback); 263 send(sessionInfo, toRuleEngineMsg, callback);
255 } 264 }
256 265
257 @Override 266 @Override
258 - public void process(SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) { 267 + public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
259 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 268 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
260 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 269 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
261 .setGetAttributes(msg).build() 270 .setGetAttributes(msg).build()
262 ).build(); 271 ).build();
263 send(sessionInfo, toRuleEngineMsg, callback); 272 send(sessionInfo, toRuleEngineMsg, callback);
264 } 273 }
265 274
266 @Override 275 @Override
267 - public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) { 276 + public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
268 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 277 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
269 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 278 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
270 .setSubscribeToAttributes(msg).build() 279 .setSubscribeToAttributes(msg).build()
271 ).build(); 280 ).build();
272 send(sessionInfo, toRuleEngineMsg, callback); 281 send(sessionInfo, toRuleEngineMsg, callback);
273 } 282 }
274 283
275 @Override 284 @Override
276 - public void process(SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) { 285 + public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
277 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 286 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
278 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 287 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
279 .setSubscribeToRPC(msg).build() 288 .setSubscribeToRPC(msg).build()
280 ).build(); 289 ).build();
281 send(sessionInfo, toRuleEngineMsg, callback); 290 send(sessionInfo, toRuleEngineMsg, callback);
282 } 291 }
283 292
284 @Override 293 @Override
285 - public void process(SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) { 294 + public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
286 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 295 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
287 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 296 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
288 .setToDeviceRPCCallResponse(msg).build() 297 .setToDeviceRPCCallResponse(msg).build()
289 ).build(); 298 ).build();
290 send(sessionInfo, toRuleEngineMsg, callback); 299 send(sessionInfo, toRuleEngineMsg, callback);
291 } 300 }
292 301
293 @Override 302 @Override
294 - public void process(SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) { 303 + public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
295 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( 304 ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
296 - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) 305 + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
297 .setToServerRPCCallRequest(msg).build() 306 .setToServerRPCCallRequest(msg).build()
298 ).build(); 307 ).build();
299 send(sessionInfo, toRuleEngineMsg, callback); 308 send(sessionInfo, toRuleEngineMsg, callback);