Commit cc5b18f30e51e983d6bd7292875b553f26f0a4b5

Authored by Andrew Shvayka
1 parent e5a1fb58

MQTT Gateway API Implementation

@@ -37,11 +37,16 @@ import org.thingsboard.server.common.msg.core.BasicRequest; @@ -37,11 +37,16 @@ import org.thingsboard.server.common.msg.core.BasicRequest;
37 import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; 37 import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;
38 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; 38 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
39 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; 39 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
40 -import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;  
41 -import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;  
42 import org.thingsboard.server.common.msg.kv.AttributesKVMsg; 40 import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
43 import org.thingsboard.server.gen.transport.TransportProtos; 41 import org.thingsboard.server.gen.transport.TransportProtos;
44 -import org.thingsboard.server.gen.transport.TransportProtos.*; 42 +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
  45 +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
  46 +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
  47 +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
  48 +import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
  49 +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
45 50
46 import java.util.ArrayList; 51 import java.util.ArrayList;
47 import java.util.List; 52 import java.util.List;
@@ -446,4 +451,10 @@ public class JsonConverter { @@ -446,4 +451,10 @@ public class JsonConverter {
446 return error; 451 return error;
447 } 452 }
448 453
  454 + public static JsonElement toGatewayJson(String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  455 + JsonObject result = new JsonObject();
  456 + result.addProperty(DEVICE_PROPERTY, deviceName);
  457 + result.add("data", JsonConverter.toJson(rpcRequest, true));
  458 + return result;
  459 + }
449 } 460 }
@@ -85,7 +85,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; @@ -85,7 +85,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
85 @Slf4j 85 @Slf4j
86 public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener { 86 public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<? super Void>>, SessionMsgListener {
87 87
88 - public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; 88 + private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE;
89 89
90 private final UUID sessionId; 90 private final UUID sessionId;
91 private final MqttTransportContext context; 91 private final MqttTransportContext context;
@@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
100 private volatile DeviceSessionCtx deviceSessionCtx; 100 private volatile DeviceSessionCtx deviceSessionCtx;
101 private volatile GatewaySessionCtx gatewaySessionCtx; 101 private volatile GatewaySessionCtx gatewaySessionCtx;
102 102
103 - public MqttTransportHandler(MqttTransportContext context) { 103 + MqttTransportHandler(MqttTransportContext context) {
104 this.sessionId = UUID.randomUUID(); 104 this.sessionId = UUID.randomUUID();
105 this.context = context; 105 this.context = context;
106 this.transportService = context.getTransportService(); 106 this.transportService = context.getTransportService();
@@ -23,27 +23,23 @@ import com.google.gson.JsonSyntaxException; @@ -23,27 +23,23 @@ import com.google.gson.JsonSyntaxException;
23 import io.netty.buffer.ByteBuf; 23 import io.netty.buffer.ByteBuf;
24 import io.netty.buffer.ByteBufAllocator; 24 import io.netty.buffer.ByteBufAllocator;
25 import io.netty.buffer.UnpooledByteBufAllocator; 25 import io.netty.buffer.UnpooledByteBufAllocator;
26 -import io.netty.handler.codec.mqtt.*; 26 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  27 +import io.netty.handler.codec.mqtt.MqttMessage;
  28 +import io.netty.handler.codec.mqtt.MqttMessageType;
  29 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  30 +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
27 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
28 import org.springframework.stereotype.Component; 32 import org.springframework.stereotype.Component;
29 import org.springframework.util.StringUtils; 33 import org.springframework.util.StringUtils;
30 -import org.thingsboard.server.common.data.kv.AttributeKvEntry;  
31 -import org.thingsboard.server.common.data.kv.KvEntry;  
32 -import org.thingsboard.server.common.msg.core.*;  
33 -import org.thingsboard.server.common.msg.kv.AttributesKVMsg;  
34 -import org.thingsboard.server.common.msg.session.*;  
35 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 34 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
36 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 35 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
37 import org.thingsboard.server.gen.transport.TransportProtos; 36 import org.thingsboard.server.gen.transport.TransportProtos;
38 import org.thingsboard.server.transport.mqtt.MqttTopics; 37 import org.thingsboard.server.transport.mqtt.MqttTopics;
39 -import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;  
40 -import org.thingsboard.server.transport.mqtt.MqttTransportHandler;  
41 import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext; 38 import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
42 39
43 import java.nio.charset.Charset; 40 import java.nio.charset.Charset;
44 import java.util.Arrays; 41 import java.util.Arrays;
45 import java.util.HashSet; 42 import java.util.HashSet;
46 -import java.util.List;  
47 import java.util.Optional; 43 import java.util.Optional;
48 import java.util.Set; 44 import java.util.Set;
49 import java.util.UUID; 45 import java.util.UUID;
@@ -169,45 +165,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -169,45 +165,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
169 } 165 }
170 166
171 @Override 167 @Override
172 - public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {  
173 - return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); 168 + public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
  169 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, JsonConverter.toGatewayJson(deviceName, rpcRequest)));
174 } 170 }
175 171
176 - private MqttMessage convertResponseMsg(MqttDeviceAwareSessionContext ctx, ToDeviceMsg msg,  
177 - ResponseMsg<?> responseMsg, Optional<Exception> responseError) throws AdaptorException {  
178 - MqttMessage result = null;  
179 - SessionMsgType requestMsgType = responseMsg.getRequestMsgType();  
180 - Integer requestId = responseMsg.getRequestId();  
181 - if (requestId >= 0) {  
182 - if (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST) {  
183 - result = MqttTransportHandler.createMqttPubAckMsg(requestId);  
184 - } else if (requestMsgType == SessionMsgType.GET_ATTRIBUTES_REQUEST) {  
185 - GetAttributesResponse response = (GetAttributesResponse) msg;  
186 - Optional<AttributesKVMsg> responseData = response.getData();  
187 - if (response.isSuccess() && responseData.isPresent()) {  
188 - result = createMqttPublishMsg(ctx,  
189 - MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId,  
190 - responseData.get(), true);  
191 - } else {  
192 - if (responseError.isPresent()) {  
193 - throw new AdaptorException(responseError.get());  
194 - }  
195 - }  
196 - }  
197 - }  
198 - return result;  
199 - }  
200 -  
201 - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, AttributesKVMsg msg, boolean asMap) {  
202 - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap));  
203 - }  
204 -  
205 - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, ToDeviceRpcRequestMsg msg) {  
206 - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false));  
207 - }  
208 -  
209 - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) {  
210 - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg)); 172 + @Override
  173 + public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) {
  174 + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse)));
211 } 175 }
212 176
213 private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, JsonElement json) { 177 private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, JsonElement json) {
@@ -219,39 +183,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -219,39 +183,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
219 return new MqttPublishMessage(mqttFixedHeader, header, payload); 183 return new MqttPublishMessage(mqttFixedHeader, header, payload);
220 } 184 }
221 185
222 - private FromDeviceMsg convertToGetAttributesRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {  
223 - String topicName = inbound.variableHeader().topicName();  
224 - try {  
225 - Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));  
226 - String payload = inbound.payload().toString(UTF8);  
227 - JsonElement requestBody = new JsonParser().parse(payload);  
228 - Set<String> clientKeys = toStringSet(requestBody, "clientKeys");  
229 - Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");  
230 - if (clientKeys == null && sharedKeys == null) {  
231 - return new BasicGetAttributesRequest(requestId);  
232 - } else {  
233 - return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);  
234 - }  
235 - } catch (RuntimeException e) {  
236 - log.warn("Failed to decode get attributes request", e);  
237 - throw new AdaptorException(e);  
238 - }  
239 - }  
240 -  
241 - private FromDeviceMsg convertToRpcCommandResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {  
242 - String topicName = inbound.variableHeader().topicName();  
243 - try {  
244 - Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length()));  
245 - String payload = inbound.payload().toString(UTF8);  
246 - return new ToDeviceRpcResponseMsg(  
247 - requestId,  
248 - payload);  
249 - } catch (RuntimeException e) {  
250 - log.warn("Failed to decode get attributes request", e);  
251 - throw new AdaptorException(e);  
252 - }  
253 - }  
254 -  
255 private Set<String> toStringSet(JsonElement requestBody, String name) { 186 private Set<String> toStringSet(JsonElement requestBody, String name) {
256 JsonElement element = requestBody.getAsJsonObject().get(name); 187 JsonElement element = requestBody.getAsJsonObject().get(name);
257 if (element != null) { 188 if (element != null) {
@@ -261,24 +192,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -261,24 +192,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
261 } 192 }
262 } 193 }
263 194
264 - private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {  
265 - String payload = validatePayload(ctx.getSessionId(), inbound.payload());  
266 - try {  
267 - return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId());  
268 - } catch (IllegalStateException | JsonSyntaxException ex) {  
269 - throw new AdaptorException(ex);  
270 - }  
271 - }  
272 -  
273 - private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {  
274 - String payload = validatePayload(ctx.getSessionId(), inbound.payload());  
275 - try {  
276 - return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId());  
277 - } catch (IllegalStateException | JsonSyntaxException ex) {  
278 - throw new AdaptorException(ex);  
279 - }  
280 - }  
281 -  
282 public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { 195 public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
283 String payload = validatePayload(sessionId, payloadData); 196 String payload = validatePayload(sessionId, payloadData);
284 try { 197 try {
@@ -288,7 +201,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @@ -288,7 +201,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
288 } 201 }
289 } 202 }
290 203
291 - public static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { 204 + private static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException {
292 try { 205 try {
293 String payload = payloadData.toString(UTF8); 206 String payload = payloadData.toString(UTF8);
294 if (payload == null) { 207 if (payload == null) {
@@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.adaptors; @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.adaptors;
18 import io.netty.handler.codec.mqtt.MqttMessage; 18 import io.netty.handler.codec.mqtt.MqttMessage;
19 import io.netty.handler.codec.mqtt.MqttPublishMessage; 19 import io.netty.handler.codec.mqtt.MqttPublishMessage;
20 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 20 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
21 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
22 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; 23 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
23 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
@@ -56,5 +57,7 @@ public interface MqttTransportAdaptor { @@ -56,5 +57,7 @@ public interface MqttTransportAdaptor {
56 57
57 Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; 58 Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
58 59
  60 + Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
  61 +
59 Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException; 62 Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException;
60 } 63 }
@@ -15,43 +15,14 @@ @@ -15,43 +15,14 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 -import com.google.gson.Gson;  
19 -import com.google.gson.JsonElement;  
20 -import com.google.gson.JsonObject;  
21 -import com.sun.xml.internal.bind.v2.TODO;  
22 -import io.netty.buffer.ByteBuf;  
23 -import io.netty.buffer.ByteBufAllocator;  
24 -import io.netty.buffer.UnpooledByteBufAllocator;  
25 -import io.netty.handler.codec.mqtt.MqttFixedHeader;  
26 -import io.netty.handler.codec.mqtt.MqttMessage;  
27 -import io.netty.handler.codec.mqtt.MqttMessageType;  
28 -import io.netty.handler.codec.mqtt.MqttPublishMessage;  
29 -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;  
30 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
31 -import org.thingsboard.server.common.data.kv.AttributeKvEntry;  
32 -import org.thingsboard.server.common.data.kv.KvEntry;  
33 -import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;  
34 -import org.thingsboard.server.common.msg.core.GetAttributesResponse;  
35 -import org.thingsboard.server.common.msg.core.ResponseMsg;  
36 -import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;  
37 -import org.thingsboard.server.common.msg.kv.AttributesKVMsg;  
38 -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;  
39 -import org.thingsboard.server.common.msg.session.SessionMsgType;  
40 -import org.thingsboard.server.common.msg.session.ToDeviceMsg;  
41 import org.thingsboard.server.common.transport.SessionMsgListener; 19 import org.thingsboard.server.common.transport.SessionMsgListener;
42 -import org.thingsboard.server.common.transport.adaptor.JsonConverter;  
43 import org.thingsboard.server.gen.transport.TransportProtos; 20 import org.thingsboard.server.gen.transport.TransportProtos;
44 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; 21 import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
45 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; 22 import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
46 -import org.thingsboard.server.transport.mqtt.MqttTopics;  
47 -import org.thingsboard.server.transport.mqtt.MqttTransportHandler;  
48 23
49 -import java.nio.charset.Charset;  
50 -import java.util.List;  
51 -import java.util.Optional;  
52 import java.util.UUID; 24 import java.util.UUID;
53 import java.util.concurrent.ConcurrentMap; 25 import java.util.concurrent.ConcurrentMap;
54 -import java.util.concurrent.atomic.AtomicInteger;  
55 26
56 /** 27 /**
57 * Created by ashvayka on 19.01.17. 28 * Created by ashvayka on 19.01.17.
@@ -59,15 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -59,15 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
59 @Slf4j 30 @Slf4j
60 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { 31 public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
61 32
62 - private static final Gson GSON = new Gson();  
63 - private static final Charset UTF8 = Charset.forName("UTF-8");  
64 - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);  
65 -  
66 private final GatewaySessionCtx parent; 33 private final GatewaySessionCtx parent;
67 private final UUID sessionId; 34 private final UUID sessionId;
68 private final SessionInfoProto sessionInfo; 35 private final SessionInfoProto sessionInfo;
69 - private volatile boolean closed;  
70 - private AtomicInteger msgIdSeq = new AtomicInteger(0);  
71 36
72 public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) { 37 public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap<String, Integer> mqttQoSMap) {
73 super(mqttQoSMap); 38 super(mqttQoSMap);
@@ -90,109 +55,9 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -90,109 +55,9 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
90 return sessionId; 55 return sessionId;
91 } 56 }
92 57
93 - private Optional<MqttMessage> getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) {  
94 - ToDeviceMsg msg = sessionMsg.getMsg();  
95 - switch (msg.getSessionMsgType()) {  
96 - case STATUS_CODE_RESPONSE:  
97 - ResponseMsg<?> responseMsg = (ResponseMsg) msg;  
98 - if (responseMsg.isSuccess()) {  
99 - SessionMsgType requestMsgType = responseMsg.getRequestMsgType();  
100 - Integer requestId = responseMsg.getRequestId();  
101 - if (requestId >= 0 && (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST)) {  
102 - return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId));  
103 - }  
104 - }  
105 - break;  
106 - case GET_ATTRIBUTES_RESPONSE:  
107 - GetAttributesResponse response = (GetAttributesResponse) msg;  
108 - if (response.isSuccess()) {  
109 - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response));  
110 - } else {  
111 - //TODO: push error handling to the gateway  
112 - }  
113 - break;  
114 - case ATTRIBUTES_UPDATE_NOTIFICATION:  
115 - AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;  
116 - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData()));  
117 - case TO_DEVICE_RPC_REQUEST:  
118 - ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;  
119 - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest));  
120 - default:  
121 - break;  
122 - }  
123 - return Optional.empty();  
124 - }  
125 -  
126 - private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) {  
127 - JsonObject result = new JsonObject();  
128 - result.addProperty("id", response.getRequestId());  
129 -// result.addProperty(DEVICE_PROPERTY, device.getName());  
130 - Optional<AttributesKVMsg> responseData = response.getData();  
131 - if (responseData.isPresent()) {  
132 - AttributesKVMsg msg = responseData.get();  
133 - if (msg.getClientAttributes() != null) {  
134 - addValues(result, msg.getClientAttributes());  
135 - }  
136 - if (msg.getSharedAttributes() != null) {  
137 - addValues(result, msg.getSharedAttributes());  
138 - }  
139 - }  
140 - return createMqttPublishMsg(topic, result);  
141 - }  
142 -  
143 - private void addValues(JsonObject result, List<AttributeKvEntry> kvList) {  
144 - if (kvList.size() == 1) {  
145 - addValueToJson(result, "value", kvList.get(0));  
146 - } else {  
147 - JsonObject values;  
148 - if (result.has("values")) {  
149 - values = result.get("values").getAsJsonObject();  
150 - } else {  
151 - values = new JsonObject();  
152 - result.add("values", values);  
153 - }  
154 - kvList.forEach(value -> addValueToJson(values, value.getKey(), value));  
155 - }  
156 - }  
157 -  
158 - private void addValueToJson(JsonObject json, String name, KvEntry entry) {  
159 - switch (entry.getDataType()) {  
160 - case BOOLEAN:  
161 - entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean));  
162 - break;  
163 - case STRING:  
164 - entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString));  
165 - break;  
166 - case DOUBLE:  
167 - entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble));  
168 - break;  
169 - case LONG:  
170 - entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong));  
171 - break;  
172 - }  
173 - }  
174 -  
175 - private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) {  
176 - JsonObject result = new JsonObject();  
177 -// result.addProperty(DEVICE_PROPERTY, device.getName());  
178 - result.add("data", JsonConverter.toJson(data, false));  
179 - return createMqttPublishMsg(topic, result);  
180 - }  
181 -  
182 - private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) {  
183 - JsonObject result = new JsonObject();  
184 -// result.addProperty(DEVICE_PROPERTY, device.getName());  
185 - result.add("data", JsonConverter.toJson(data, true));  
186 - return createMqttPublishMsg(topic, result);  
187 - }  
188 -  
189 - private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) {  
190 - MqttFixedHeader mqttFixedHeader =  
191 - new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0);  
192 - MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet());  
193 - ByteBuf payload = ALLOCATOR.buffer();  
194 - payload.writeBytes(GSON.toJson(json).getBytes(UTF8));  
195 - return new MqttPublishMessage(mqttFixedHeader, header, payload); 58 + @Override
  59 + public int nextMsgId() {
  60 + return parent.nextMsgId();
196 } 61 }
197 62
198 SessionInfoProto getSessionInfo() { 63 SessionInfoProto getSessionInfo() {
@@ -223,12 +88,16 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -223,12 +88,16 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
223 } 88 }
224 89
225 @Override 90 @Override
226 - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) {  
227 - 91 + public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) {
  92 + try {
  93 + parent.getAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush);
  94 + } catch (Exception e) {
  95 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  96 + }
228 } 97 }
229 98
230 @Override 99 @Override
231 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { 100 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
232 - TODO 101 + // This feature is not supported in the TB IoT Gateway yet.
233 } 102 }
234 } 103 }
@@ -52,6 +52,7 @@ import java.util.Set; @@ -52,6 +52,7 @@ import java.util.Set;
52 import java.util.UUID; 52 import java.util.UUID;
53 import java.util.concurrent.ConcurrentHashMap; 53 import java.util.concurrent.ConcurrentHashMap;
54 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
  55 +import java.util.concurrent.atomic.AtomicInteger;
55 56
56 /** 57 /**
57 * Created by ashvayka on 19.01.17. 58 * Created by ashvayka on 19.01.17.
@@ -70,10 +71,12 @@ public class GatewaySessionCtx { @@ -70,10 +71,12 @@ public class GatewaySessionCtx {
70 private final Map<String, GatewayDeviceSessionCtx> devices; 71 private final Map<String, GatewayDeviceSessionCtx> devices;
71 private final ConcurrentMap<String, Integer> mqttQoSMap; 72 private final ConcurrentMap<String, Integer> mqttQoSMap;
72 private final ChannelHandlerContext channel; 73 private final ChannelHandlerContext channel;
  74 + private final DeviceSessionCtx deviceSessionCtx;
73 75
74 public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) { 76 public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
75 this.context = context; 77 this.context = context;
76 this.transportService = context.getTransportService(); 78 this.transportService = context.getTransportService();
  79 + this.deviceSessionCtx = deviceSessionCtx;
77 this.gateway = deviceSessionCtx.getDeviceInfo(); 80 this.gateway = deviceSessionCtx.getDeviceInfo();
78 this.sessionId = sessionId; 81 this.sessionId = sessionId;
79 this.devices = new ConcurrentHashMap<>(); 82 this.devices = new ConcurrentHashMap<>();
@@ -357,4 +360,8 @@ public class GatewaySessionCtx { @@ -357,4 +360,8 @@ public class GatewaySessionCtx {
357 public MqttTransportAdaptor getAdaptor() { 360 public MqttTransportAdaptor getAdaptor() {
358 return context.getAdaptor(); 361 return context.getAdaptor();
359 } 362 }
  363 +
  364 + public int nextMsgId() {
  365 + return deviceSessionCtx.nextMsgId();
  366 + }
360 } 367 }