Commit 29ed792a00ea151df21648eafbce04f688da5f7e

Authored by Andrew Shvayka
1 parent 516d8791

Mqtt Sub QoS improvement

... ... @@ -73,4 +73,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
73 73 public Device getDevice() {
74 74 return device;
75 75 }
  76 +
  77 +
76 78 }
... ...
... ... @@ -53,6 +53,7 @@ import java.util.ArrayList;
53 53 import java.util.List;
54 54 import java.util.Map;
55 55 import java.util.concurrent.ConcurrentHashMap;
  56 +import java.util.concurrent.ConcurrentMap;
56 57
57 58 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
58 59 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
... ... @@ -77,12 +78,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
77 78 private final RelationService relationService;
78 79 private final QuotaService quotaService;
79 80 private final SslHandler sslHandler;
  81 + private final ConcurrentMap<String, Integer> mqttQoSMap;
  82 +
80 83 private volatile boolean connected;
81 84 private volatile InetSocketAddress address;
82 85 private volatile GatewaySessionCtx gatewaySessionCtx;
83 86
84   - private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>();
85   -
86 87 public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
87 88 MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
88 89 this.processor = processor;
... ... @@ -90,7 +91,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
90 91 this.relationService = relationService;
91 92 this.authService = authService;
92 93 this.adaptor = adaptor;
93   - this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor);
  94 + this.mqttQoSMap = new ConcurrentHashMap<>();
  95 + this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap);
94 96 this.sessionId = deviceSessionCtx.getSessionId().toUidStr();
95 97 this.sslHandler = sslHandler;
96 98 this.quotaService = quotaService;
... ... @@ -170,18 +172,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
170 172
171 173 private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
172 174 try {
173   - if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) {
174   - gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
175   - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
176   - gatewaySessionCtx.onDeviceAttributes(mqttMsg);
177   - } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) {
178   - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
179   - } else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
180   - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
181   - } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) {
182   - gatewaySessionCtx.onDeviceConnect(mqttMsg);
183   - } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) {
184   - gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
  175 + switch (topicName) {
  176 + case GATEWAY_TELEMETRY_TOPIC:
  177 + gatewaySessionCtx.onDeviceTelemetry(mqttMsg);
  178 + break;
  179 + case GATEWAY_ATTRIBUTES_TOPIC:
  180 + gatewaySessionCtx.onDeviceAttributes(mqttMsg);
  181 + break;
  182 + case GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
  183 + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg);
  184 + break;
  185 + case GATEWAY_RPC_TOPIC:
  186 + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg);
  187 + break;
  188 + case GATEWAY_CONNECT_TOPIC:
  189 + gatewaySessionCtx.onDeviceConnect(mqttMsg);
  190 + break;
  191 + case GATEWAY_DISCONNECT_TOPIC:
  192 + gatewaySessionCtx.onDeviceDisconnect(mqttMsg);
  193 + break;
185 194 }
186 195 } catch (RuntimeException | AdaptorException e) {
187 196 log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
... ... @@ -229,40 +238,53 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
229 238 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
230 239 List<Integer> grantedQoSList = new ArrayList<>();
231 240 for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
232   - String topicName = subscription.topicName();
233   - //TODO: handle this qos level.
  241 + String topic = subscription.topicName();
234 242 MqttQoS reqQoS = subscription.qualityOfService();
235   - mqttQoSMap.put(topicName, reqQoS);
236 243 try {
237   - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
238   - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
239   - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
240   - grantedQoSList.add(getMinSupportedQos(reqQoS));
241   - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
242   - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
243   - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
244   - grantedQoSList.add(getMinSupportedQos(reqQoS));
245   - } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) {
246   - grantedQoSList.add(getMinSupportedQos(reqQoS));
247   - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
248   - deviceSessionCtx.setAllowAttributeResponses();
249   - grantedQoSList.add(getMinSupportedQos(reqQoS));
250   - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
251   - grantedQoSList.add(getMinSupportedQos(reqQoS));
252   - }else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
253   - grantedQoSList.add(getMinSupportedQos(reqQoS));
254   - } else {
255   - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
256   - grantedQoSList.add(FAILURE.value());
  244 + switch (topic) {
  245 + case DEVICE_ATTRIBUTES_TOPIC: {
  246 + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
  247 + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
  248 + registerSubQoS(topic, grantedQoSList, reqQoS);
  249 + break;
  250 + }
  251 + case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
  252 + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
  253 + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
  254 + registerSubQoS(topic, grantedQoSList, reqQoS);
  255 + break;
  256 + }
  257 + case DEVICE_RPC_RESPONSE_SUB_TOPIC:
  258 + registerSubQoS(topic, grantedQoSList, reqQoS);
  259 + break;
  260 + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
  261 + deviceSessionCtx.setAllowAttributeResponses();
  262 + registerSubQoS(topic, grantedQoSList, reqQoS);
  263 + break;
  264 + case GATEWAY_ATTRIBUTES_TOPIC:
  265 + registerSubQoS(topic, grantedQoSList, reqQoS);
  266 + break;
  267 + case GATEWAY_RPC_TOPIC:
  268 + registerSubQoS(topic, grantedQoSList, reqQoS);
  269 + break;
  270 + default:
  271 + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
  272 + grantedQoSList.add(FAILURE.value());
  273 + break;
257 274 }
258 275 } catch (AdaptorException e) {
259   - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
  276 + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
260 277 grantedQoSList.add(FAILURE.value());
261 278 }
262 279 }
263 280 ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
264 281 }
265 282
  283 + private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
  284 + grantedQoSList.add(getMinSupportedQos(reqQoS));
  285 + mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
  286 + }
  287 +
266 288 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
267 289 if (!checkConnected(ctx)) {
268 290 return;
... ... @@ -271,14 +293,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
271 293 for (String topicName : mqttMsg.payload().topics()) {
272 294 mqttQoSMap.remove(topicName);
273 295 try {
274   - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
275   - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
276   - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
277   - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) {
278   - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
279   - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
280   - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) {
281   - deviceSessionCtx.setDisallowAttributeResponses();
  296 + switch (topicName) {
  297 + case DEVICE_ATTRIBUTES_TOPIC: {
  298 + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
  299 + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
  300 + break;
  301 + }
  302 + case DEVICE_RPC_REQUESTS_SUB_TOPIC: {
  303 + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg);
  304 + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg));
  305 + break;
  306 + }
  307 + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
  308 + deviceSessionCtx.setDisallowAttributeResponses();
  309 + break;
282 310 }
283 311 } catch (AdaptorException e) {
284 312 log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
... ...
... ... @@ -170,7 +170,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
170 170
171 171 private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) {
172 172 MqttFixedHeader mqttFixedHeader =
173   - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  173 + new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
174 174 MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
175 175 ByteBuf payload = ALLOCATOR.buffer();
176 176 payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
... ...
... ... @@ -30,13 +30,15 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
30 30 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
31 31 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
32 32
  33 +import java.util.Map;
  34 +import java.util.concurrent.ConcurrentMap;
33 35 import java.util.concurrent.atomic.AtomicInteger;
34 36
35 37 /**
36 38 * @author Andrew Shvayka
37 39 */
38 40 @Slf4j
39   -public class DeviceSessionCtx extends DeviceAwareSessionContext {
  41 +public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
40 42
41 43 private final MqttTransportAdaptor adaptor;
42 44 private final MqttSessionId sessionId;
... ... @@ -44,8 +46,8 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
44 46 private volatile boolean allowAttributeResponses;
45 47 private AtomicInteger msgIdSeq = new AtomicInteger(0);
46 48
47   - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor) {
48   - super(processor, authService);
  49 + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
  50 + super(processor, authService, mqttQoSMap);
49 51 this.adaptor = adaptor;
50 52 this.sessionId = new MqttSessionId();
51 53 }
... ...
... ... @@ -38,13 +38,15 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
38 38
39 39 import java.nio.charset.Charset;
40 40 import java.util.List;
  41 +import java.util.Map;
41 42 import java.util.Optional;
  43 +import java.util.concurrent.ConcurrentMap;
42 44 import java.util.concurrent.atomic.AtomicInteger;
43 45
44 46 /**
45 47 * Created by ashvayka on 19.01.17.
46 48 */
47   -public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
  49 +public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
48 50
49 51 private static final Gson GSON = new Gson();
50 52 private static final Charset UTF8 = Charset.forName("UTF-8");
... ... @@ -56,8 +58,8 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
56 58 private volatile boolean closed;
57 59 private AtomicInteger msgIdSeq = new AtomicInteger(0);
58 60
59   - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) {
60   - super(parent.getProcessor(), parent.getAuthService(), device);
  61 + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
  62 + super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap);
61 63 this.parent = parent;
62 64 this.sessionId = new MqttSessionId();
63 65 }
... ... @@ -195,7 +197,7 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
195 197
196 198 private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) {
197 199 MqttFixedHeader mqttFixedHeader =
198   - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
  200 + new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0);
199 201 MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet());
200 202 ByteBuf payload = ALLOCATOR.buffer();
201 203 payload.writeBytes(GSON.toJson(json).getBytes(UTF8));
... ...
... ... @@ -43,6 +43,7 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
43 43 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
44 44
45 45 import java.util.*;
  46 +import java.util.concurrent.ConcurrentMap;
46 47 import java.util.stream.Collectors;
47 48
48 49 import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
... ... @@ -63,6 +64,7 @@ public class GatewaySessionCtx {
63 64 private final DeviceAuthService authService;
64 65 private final RelationService relationService;
65 66 private final Map<String, GatewayDeviceSessionCtx> devices;
  67 + private final ConcurrentMap<String, Integer> mqttQoSMap;
66 68 private ChannelHandlerContext channel;
67 69
68 70 public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
... ... @@ -73,6 +75,7 @@ public class GatewaySessionCtx {
73 75 this.gateway = gatewaySessionCtx.getDevice();
74 76 this.gatewaySessionId = gatewaySessionCtx.getSessionId();
75 77 this.devices = new HashMap<>();
  78 + this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
76 79 }
77 80
78 81 public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
... ... @@ -96,7 +99,7 @@ public class GatewaySessionCtx {
96 99 relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
97 100 processor.onDeviceAdded(device);
98 101 }
99   - GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device);
  102 + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
100 103 devices.put(deviceName, ctx);
101 104 log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
102 105 processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.mqtt.session;
  17 +
  18 +import io.netty.handler.codec.mqtt.MqttQoS;
  19 +import org.thingsboard.server.common.data.Device;
  20 +import org.thingsboard.server.common.transport.SessionMsgProcessor;
  21 +import org.thingsboard.server.common.transport.auth.DeviceAuthService;
  22 +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  23 +
  24 +import java.util.Map;
  25 +import java.util.concurrent.ConcurrentMap;
  26 +
  27 +/**
  28 + * Created by ashvayka on 30.08.18.
  29 + */
  30 +public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
  31 +
  32 + private final ConcurrentMap<String, Integer> mqttQoSMap;
  33 +
  34 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {
  35 + super(processor, authService);
  36 + this.mqttQoSMap = mqttQoSMap;
  37 + }
  38 +
  39 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
  40 + super(processor, authService, device);
  41 + this.mqttQoSMap = mqttQoSMap;
  42 + }
  43 +
  44 + public ConcurrentMap<String, Integer> getMqttQoSMap() {
  45 + return mqttQoSMap;
  46 + }
  47 +
  48 + public MqttQoS getQoSForTopic(String topic) {
  49 + Integer qos = mqttQoSMap.get(topic);
  50 + if (qos != null) {
  51 + return MqttQoS.valueOf(qos);
  52 + } else {
  53 + return MqttQoS.AT_LEAST_ONCE;
  54 + }
  55 + }
  56 +
  57 +}
... ...