Commit 40dfc60f58297f52f55b562ee0393e4ce75f5ba5

Authored by Andrew Shvayka
Committed by GitHub
2 parents 767a49d4 d259920d

Merge pull request #64 from thingsboard/feature/TB-41

TB-41: Add MQTT API for Gateway to support attribute updates and server-side RPC
@@ -47,7 +47,7 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @@ -47,7 +47,7 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
47 47
48 @Override 48 @Override
49 public Optional<Set<String>> getClientAttributeNames() { 49 public Optional<Set<String>> getClientAttributeNames() {
50 - return Optional.of(clientKeys); 50 + return Optional.ofNullable(clientKeys);
51 } 51 }
52 52
53 @Override 53 @Override
@@ -32,10 +32,13 @@ public class MqttTopics { @@ -32,10 +32,13 @@ public class MqttTopics {
32 public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; 32 public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes";
33 33
34 public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; 34 public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway";
35 - public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect";  
36 - public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect";  
37 - public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes";  
38 - public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry"; 35 + public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/connect";
  36 + public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect";
  37 + public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes";
  38 + public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry";
  39 + public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc";
  40 + public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request";
  41 + public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response";
39 42
40 43
41 private MqttTopics() { 44 private MqttTopics() {
@@ -134,6 +134,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -134,6 +134,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
134 gatewaySessionCtx.onDeviceTelemetry(mqttMsg); 134 gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
135 } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { 135 } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
136 gatewaySessionCtx.onDeviceAttributes(mqttMsg); 136 gatewaySessionCtx.onDeviceAttributes(mqttMsg);
  137 + } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) {
  138 + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
  139 + } else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
  140 + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
137 } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { 141 } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
138 gatewaySessionCtx.onDeviceConnect(mqttMsg); 142 gatewaySessionCtx.onDeviceConnect(mqttMsg);
139 } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { 143 } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
1 -/**  
2 - * Copyright © 2016-2017 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.transport.mqtt.adaptors;  
17 -  
18 -import io.netty.handler.codec.mqtt.MqttMessage;  
19 -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;  
20 -import org.thingsboard.server.common.msg.session.MsgType;  
21 -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;  
22 -import org.thingsboard.server.common.transport.adaptor.AdaptorException;  
23 -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;  
24 -  
25 -import java.util.Optional;  
26 -  
27 -/**  
28 - * Created by ashvayka on 19.01.17.  
29 - */  
30 -public interface MqttGatewayAdaptor {  
31 -  
32 - AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException;  
33 -  
34 - Optional<MqttMessage> convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException;  
35 -  
36 -}  
@@ -15,25 +15,45 @@ @@ -15,25 +15,45 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 -import io.netty.handler.codec.mqtt.MqttMessage; 18 +import com.google.gson.Gson;
  19 +import com.google.gson.JsonElement;
  20 +import com.google.gson.JsonObject;
  21 +import io.netty.buffer.ByteBuf;
  22 +import io.netty.buffer.ByteBufAllocator;
  23 +import io.netty.buffer.UnpooledByteBufAllocator;
  24 +import io.netty.handler.codec.mqtt.*;
19 import org.thingsboard.server.common.data.Device; 25 import org.thingsboard.server.common.data.Device;
20 import org.thingsboard.server.common.data.id.SessionId; 26 import org.thingsboard.server.common.data.id.SessionId;
  27 +import org.thingsboard.server.common.data.kv.KvEntry;
  28 +import org.thingsboard.server.common.msg.core.AttributesUpdateNotification;
  29 +import org.thingsboard.server.common.msg.core.GetAttributesResponse;
21 import org.thingsboard.server.common.msg.core.ResponseMsg; 30 import org.thingsboard.server.common.msg.core.ResponseMsg;
  31 +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
  32 +import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
22 import org.thingsboard.server.common.msg.session.*; 33 import org.thingsboard.server.common.msg.session.*;
23 import org.thingsboard.server.common.msg.session.ex.SessionException; 34 import org.thingsboard.server.common.msg.session.ex.SessionException;
  35 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
24 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 36 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  37 +import org.thingsboard.server.transport.mqtt.MqttTopics;
25 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 38 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
26 39
  40 +import java.nio.charset.Charset;
27 import java.util.Optional; 41 import java.util.Optional;
  42 +import java.util.concurrent.atomic.AtomicInteger;
28 43
29 /** 44 /**
30 * Created by ashvayka on 19.01.17. 45 * Created by ashvayka on 19.01.17.
31 */ 46 */
32 public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { 47 public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
33 48
  49 + private static final Gson GSON = new Gson();
  50 + private static final Charset UTF8 = Charset.forName("UTF-8");
  51 + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
  52 +
34 private GatewaySessionCtx parent; 53 private GatewaySessionCtx parent;
35 private final MqttSessionId sessionId; 54 private final MqttSessionId sessionId;
36 private volatile boolean closed; 55 private volatile boolean closed;
  56 + private AtomicInteger msgIdSeq = new AtomicInteger(0);
37 57
38 public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { 58 public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
39 super(parent.getProcessor(), parent.getAuthService(), device); 59 super(parent.getProcessor(), parent.getAuthService(), device);
@@ -70,6 +90,20 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { @@ -70,6 +90,20 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
70 } 90 }
71 } 91 }
72 break; 92 break;
  93 + case GET_ATTRIBUTES_RESPONSE:
  94 + GetAttributesResponse response = (GetAttributesResponse) msg;
  95 + if (response.isSuccess()) {
  96 + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response));
  97 + } else {
  98 + //TODO: push error handling to the gateway
  99 + }
  100 + break;
  101 + case ATTRIBUTES_UPDATE_NOTIFICATION:
  102 + AttributesUpdateNotification notification = (AttributesUpdateNotification) msg;
  103 + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData()));
  104 + case TO_DEVICE_RPC_REQUEST:
  105 + ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg;
  106 + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest));
73 } 107 }
74 return Optional.empty(); 108 return Optional.empty();
75 } 109 }
@@ -92,4 +126,61 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { @@ -92,4 +126,61 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
92 public long getTimeout() { 126 public long getTimeout() {
93 return 0; 127 return 0;
94 } 128 }
  129 +
  130 + private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) {
  131 + JsonObject result = new JsonObject();
  132 + result.addProperty("id", response.getRequestId());
  133 + result.addProperty("device", device.getName());
  134 + if (response.getData().isPresent()) {
  135 + AttributesKVMsg msg = response.getData().get();
  136 + if (msg.getClientAttributes() != null) {
  137 + msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v));
  138 + }
  139 + if (msg.getSharedAttributes() != null) {
  140 + msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v));
  141 + }
  142 + }
  143 + return createMqttPublishMsg(topic, result);
  144 + }
  145 +
  146 + private void addValueToJson(JsonObject json, String name, KvEntry entry) {
  147 + switch (entry.getDataType()) {
  148 + case BOOLEAN:
  149 + json.addProperty(name, entry.getBooleanValue().get());
  150 + break;
  151 + case STRING:
  152 + json.addProperty(name, entry.getStrValue().get());
  153 + break;
  154 + case DOUBLE:
  155 + json.addProperty(name, entry.getDoubleValue().get());
  156 + break;
  157 + case LONG:
  158 + json.addProperty(name, entry.getLongValue().get());
  159 + break;
  160 + }
  161 + }
  162 +
  163 + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) {
  164 + JsonObject result = new JsonObject();
  165 + result.addProperty("device", device.getName());
  166 + result.add("data", JsonConverter.toJson(data, false));
  167 + return createMqttPublishMsg(topic, result);
  168 + }
  169 +
  170 + private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) {
  171 + JsonObject result = new JsonObject();
  172 + result.addProperty("device", device.getName());
  173 + result.add("data", JsonConverter.toJson(data, true));
  174 + return createMqttPublishMsg(topic, result);
  175 + }
  176 +
  177 + private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) {
  178 + MqttFixedHeader mqttFixedHeader =
  179 + new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  180 + MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet());
  181 + ByteBuf payload = ALLOCATOR.buffer();
  182 + payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
  183 + return new MqttPublishMessage(mqttFixedHeader, header, payload);
  184 + }
  185 +
95 } 186 }
@@ -15,9 +15,10 @@ @@ -15,9 +15,10 @@
15 */ 15 */
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 -import com.google.gson.*;  
19 -import io.netty.buffer.ByteBufAllocator;  
20 -import io.netty.buffer.UnpooledByteBufAllocator; 18 +import com.google.gson.JsonArray;
  19 +import com.google.gson.JsonElement;
  20 +import com.google.gson.JsonObject;
  21 +import com.google.gson.JsonSyntaxException;
21 import io.netty.channel.ChannelHandlerContext; 22 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.handler.codec.mqtt.MqttMessage; 23 import io.netty.handler.codec.mqtt.MqttMessage;
23 import io.netty.handler.codec.mqtt.MqttPublishMessage; 24 import io.netty.handler.codec.mqtt.MqttPublishMessage;
@@ -26,9 +27,7 @@ import org.springframework.util.StringUtils; @@ -26,9 +27,7 @@ import org.springframework.util.StringUtils;
26 import org.thingsboard.server.common.data.Device; 27 import org.thingsboard.server.common.data.Device;
27 import org.thingsboard.server.common.data.id.SessionId; 28 import org.thingsboard.server.common.data.id.SessionId;
28 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; 29 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
29 -import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest;  
30 -import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest;  
31 -import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; 30 +import org.thingsboard.server.common.msg.core.*;
32 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; 31 import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
33 import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; 32 import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg;
34 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; 33 import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
@@ -40,7 +39,7 @@ import org.thingsboard.server.dao.device.DeviceService; @@ -40,7 +39,7 @@ import org.thingsboard.server.dao.device.DeviceService;
40 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 39 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
41 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; 40 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
42 41
43 -import java.nio.charset.Charset; 42 +import java.util.Collections;
44 import java.util.HashMap; 43 import java.util.HashMap;
45 import java.util.Map; 44 import java.util.Map;
46 import java.util.Optional; 45 import java.util.Optional;
@@ -54,10 +53,6 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val @@ -54,10 +53,6 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val
54 @Slf4j 53 @Slf4j
55 public class GatewaySessionCtx { 54 public class GatewaySessionCtx {
56 55
57 - private static final Gson GSON = new Gson();  
58 - private static final Charset UTF8 = Charset.forName("UTF-8");  
59 - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);  
60 -  
61 private final Device gateway; 56 private final Device gateway;
62 private final SessionId gatewaySessionId; 57 private final SessionId gatewaySessionId;
63 private final SessionMsgProcessor processor; 58 private final SessionMsgProcessor processor;
@@ -84,7 +79,10 @@ public class GatewaySessionCtx { @@ -84,7 +79,10 @@ public class GatewaySessionCtx {
84 newDevice.setName(deviceName); 79 newDevice.setName(deviceName);
85 return deviceService.saveDevice(newDevice); 80 return deviceService.saveDevice(newDevice);
86 }); 81 });
87 - devices.put(deviceName, new GatewayDeviceSessionCtx(this, device)); 82 + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
  83 + devices.put(deviceName, ctx);
  84 + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
  85 + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
88 ack(msg); 86 ack(msg);
89 } 87 }
90 88
@@ -127,6 +125,21 @@ public class GatewaySessionCtx { @@ -127,6 +125,21 @@ public class GatewaySessionCtx {
127 } 125 }
128 } 126 }
129 127
  128 + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
  129 + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
  130 + if (json.isJsonObject()) {
  131 + JsonObject jsonObj = json.getAsJsonObject();
  132 + String deviceName = checkDeviceConnected(jsonObj.get("device").getAsString());
  133 + Integer requestId = jsonObj.get("id").getAsInt();
  134 + String data = jsonObj.get("data").getAsString();
  135 + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
  136 + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
  137 + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
  138 + } else {
  139 + throw new JsonSyntaxException("Can't parse value: " + json);
  140 + }
  141 + }
  142 +
130 public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { 143 public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
131 JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); 144 JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
132 int requestId = mqttMsg.variableHeader().messageId(); 145 int requestId = mqttMsg.variableHeader().messageId();
@@ -150,6 +163,29 @@ public class GatewaySessionCtx { @@ -150,6 +163,29 @@ public class GatewaySessionCtx {
150 } 163 }
151 } 164 }
152 165
  166 + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException {
  167 + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
  168 + if (json.isJsonObject()) {
  169 + JsonObject jsonObj = json.getAsJsonObject();
  170 + int requestId = jsonObj.get("id").getAsInt();
  171 + String deviceName = jsonObj.get("device").getAsString();
  172 + boolean clientScope = jsonObj.get("client").getAsBoolean();
  173 + String key = jsonObj.get("key").getAsString();
  174 +
  175 + BasicGetAttributesRequest request;
  176 + if (clientScope) {
  177 + request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null);
  178 + } else {
  179 + request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key));
  180 + }
  181 + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
  182 + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
  183 + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
  184 + } else {
  185 + throw new JsonSyntaxException("Can't parse value: " + json);
  186 + }
  187 + }
  188 +
153 private String checkDeviceConnected(String deviceName) { 189 private String checkDeviceConnected(String deviceName) {
154 if (!devices.containsKey(deviceName)) { 190 if (!devices.containsKey(deviceName)) {
155 throw new RuntimeException("Device is not connected!"); 191 throw new RuntimeException("Device is not connected!");
@@ -190,4 +226,5 @@ public class GatewaySessionCtx { @@ -190,4 +226,5 @@ public class GatewaySessionCtx {
190 protected void writeAndFlush(MqttMessage mqttMessage) { 226 protected void writeAndFlush(MqttMessage mqttMessage) {
191 channel.writeAndFlush(mqttMessage); 227 channel.writeAndFlush(mqttMessage);
192 } 228 }
  229 +
193 } 230 }