Commit 1f80e8db56669271fa2c4fe7a64748f440db18fc
Committed by
GitHub
Merge pull request #1114 from ShvaykaD/feature/mqtt-sub-qos
Mqtt Sub QoS
Showing
7 changed files
with
153 additions
and
51 deletions
... | ... | @@ -51,6 +51,9 @@ import javax.security.cert.X509Certificate; |
51 | 51 | import java.net.InetSocketAddress; |
52 | 52 | import java.util.ArrayList; |
53 | 53 | import java.util.List; |
54 | +import java.util.Map; | |
55 | +import java.util.concurrent.ConcurrentHashMap; | |
56 | +import java.util.concurrent.ConcurrentMap; | |
54 | 57 | |
55 | 58 | import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; |
56 | 59 | import static io.netty.handler.codec.mqtt.MqttMessageType.*; |
... | ... | @@ -75,6 +78,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
75 | 78 | private final RelationService relationService; |
76 | 79 | private final QuotaService quotaService; |
77 | 80 | private final SslHandler sslHandler; |
81 | + private final ConcurrentMap<String, Integer> mqttQoSMap; | |
82 | + | |
78 | 83 | private volatile boolean connected; |
79 | 84 | private volatile InetSocketAddress address; |
80 | 85 | private volatile GatewaySessionCtx gatewaySessionCtx; |
... | ... | @@ -86,7 +91,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
86 | 91 | this.relationService = relationService; |
87 | 92 | this.authService = authService; |
88 | 93 | this.adaptor = adaptor; |
89 | - this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor); | |
94 | + this.mqttQoSMap = new ConcurrentHashMap<>(); | |
95 | + this.deviceSessionCtx = new DeviceSessionCtx(processor, authService, adaptor, mqttQoSMap); | |
90 | 96 | this.sessionId = deviceSessionCtx.getSessionId().toUidStr(); |
91 | 97 | this.sslHandler = sslHandler; |
92 | 98 | this.quotaService = quotaService; |
... | ... | @@ -166,18 +172,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
166 | 172 | |
167 | 173 | private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) { |
168 | 174 | try { |
169 | - if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) { | |
170 | - gatewaySessionCtx.onDeviceTelemetry(mqttMsg); | |
171 | - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { | |
172 | - gatewaySessionCtx.onDeviceAttributes(mqttMsg); | |
173 | - } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { | |
174 | - gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); | |
175 | - } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { | |
176 | - gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); | |
177 | - } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { | |
178 | - gatewaySessionCtx.onDeviceConnect(mqttMsg); | |
179 | - } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { | |
180 | - gatewaySessionCtx.onDeviceDisconnect(mqttMsg); | |
175 | + switch (topicName) { | |
176 | + case GATEWAY_TELEMETRY_TOPIC: | |
177 | + gatewaySessionCtx.onDeviceTelemetry(mqttMsg); | |
178 | + break; | |
179 | + case GATEWAY_ATTRIBUTES_TOPIC: | |
180 | + gatewaySessionCtx.onDeviceAttributes(mqttMsg); | |
181 | + break; | |
182 | + case GATEWAY_ATTRIBUTES_REQUEST_TOPIC: | |
183 | + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); | |
184 | + break; | |
185 | + case GATEWAY_RPC_TOPIC: | |
186 | + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); | |
187 | + break; | |
188 | + case GATEWAY_CONNECT_TOPIC: | |
189 | + gatewaySessionCtx.onDeviceConnect(mqttMsg); | |
190 | + break; | |
191 | + case GATEWAY_DISCONNECT_TOPIC: | |
192 | + gatewaySessionCtx.onDeviceDisconnect(mqttMsg); | |
193 | + break; | |
181 | 194 | } |
182 | 195 | } catch (RuntimeException | AdaptorException e) { |
183 | 196 | log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); |
... | ... | @@ -225,52 +238,75 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
225 | 238 | log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); |
226 | 239 | List<Integer> grantedQoSList = new ArrayList<>(); |
227 | 240 | for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { |
228 | - String topicName = subscription.topicName(); | |
229 | - //TODO: handle this qos level. | |
241 | + String topic = subscription.topicName(); | |
230 | 242 | MqttQoS reqQoS = subscription.qualityOfService(); |
231 | 243 | try { |
232 | - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { | |
233 | - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | |
234 | - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
235 | - grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
236 | - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { | |
237 | - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); | |
238 | - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
239 | - grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
240 | - } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) { | |
241 | - grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
242 | - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { | |
243 | - deviceSessionCtx.setAllowAttributeResponses(); | |
244 | - grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
245 | - } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { | |
246 | - grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
247 | - } else { | |
248 | - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); | |
249 | - grantedQoSList.add(FAILURE.value()); | |
244 | + switch (topic) { | |
245 | + case DEVICE_ATTRIBUTES_TOPIC: { | |
246 | + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | |
247 | + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
248 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
249 | + break; | |
250 | + } | |
251 | + case DEVICE_RPC_REQUESTS_SUB_TOPIC: { | |
252 | + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); | |
253 | + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
254 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
255 | + break; | |
256 | + } | |
257 | + case DEVICE_RPC_RESPONSE_SUB_TOPIC: | |
258 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
259 | + break; | |
260 | + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: | |
261 | + deviceSessionCtx.setAllowAttributeResponses(); | |
262 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
263 | + break; | |
264 | + case GATEWAY_ATTRIBUTES_TOPIC: | |
265 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
266 | + break; | |
267 | + case GATEWAY_RPC_TOPIC: | |
268 | + registerSubQoS(topic, grantedQoSList, reqQoS); | |
269 | + break; | |
270 | + default: | |
271 | + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); | |
272 | + grantedQoSList.add(FAILURE.value()); | |
273 | + break; | |
250 | 274 | } |
251 | 275 | } catch (AdaptorException e) { |
252 | - log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); | |
276 | + log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); | |
253 | 277 | grantedQoSList.add(FAILURE.value()); |
254 | 278 | } |
255 | 279 | } |
256 | 280 | ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); |
257 | 281 | } |
258 | 282 | |
283 | + private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) { | |
284 | + grantedQoSList.add(getMinSupportedQos(reqQoS)); | |
285 | + mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); | |
286 | + } | |
287 | + | |
259 | 288 | private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { |
260 | 289 | if (!checkConnected(ctx)) { |
261 | 290 | return; |
262 | 291 | } |
263 | 292 | log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); |
264 | 293 | for (String topicName : mqttMsg.payload().topics()) { |
294 | + mqttQoSMap.remove(topicName); | |
265 | 295 | try { |
266 | - if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { | |
267 | - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | |
268 | - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
269 | - } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { | |
270 | - AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); | |
271 | - processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
272 | - } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { | |
273 | - deviceSessionCtx.setDisallowAttributeResponses(); | |
296 | + switch (topicName) { | |
297 | + case DEVICE_ATTRIBUTES_TOPIC: { | |
298 | + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | |
299 | + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
300 | + break; | |
301 | + } | |
302 | + case DEVICE_RPC_REQUESTS_SUB_TOPIC: { | |
303 | + AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, mqttMsg); | |
304 | + processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(), msg)); | |
305 | + break; | |
306 | + } | |
307 | + case DEVICE_ATTRIBUTES_RESPONSES_TOPIC: | |
308 | + deviceSessionCtx.setDisallowAttributeResponses(); | |
309 | + break; | |
274 | 310 | } |
275 | 311 | } catch (AdaptorException e) { |
276 | 312 | log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); | ... | ... |
... | ... | @@ -170,7 +170,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { |
170 | 170 | |
171 | 171 | private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) { |
172 | 172 | MqttFixedHeader mqttFixedHeader = |
173 | - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); | |
173 | + new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0); | |
174 | 174 | MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId()); |
175 | 175 | ByteBuf payload = ALLOCATOR.buffer(); |
176 | 176 | payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); | ... | ... |
... | ... | @@ -30,13 +30,15 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; |
30 | 30 | import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; |
31 | 31 | import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; |
32 | 32 | |
33 | +import java.util.Map; | |
34 | +import java.util.concurrent.ConcurrentMap; | |
33 | 35 | import java.util.concurrent.atomic.AtomicInteger; |
34 | 36 | |
35 | 37 | /** |
36 | 38 | * @author Andrew Shvayka |
37 | 39 | */ |
38 | 40 | @Slf4j |
39 | -public class DeviceSessionCtx extends DeviceAwareSessionContext { | |
41 | +public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { | |
40 | 42 | |
41 | 43 | private final MqttTransportAdaptor adaptor; |
42 | 44 | private final MqttSessionId sessionId; |
... | ... | @@ -44,8 +46,8 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext { |
44 | 46 | private volatile boolean allowAttributeResponses; |
45 | 47 | private AtomicInteger msgIdSeq = new AtomicInteger(0); |
46 | 48 | |
47 | - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor) { | |
48 | - super(processor, authService); | |
49 | + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) { | |
50 | + super(processor, authService, mqttQoSMap); | |
49 | 51 | this.adaptor = adaptor; |
50 | 52 | this.sessionId = new MqttSessionId(); |
51 | 53 | } | ... | ... |
... | ... | @@ -38,13 +38,15 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; |
38 | 38 | |
39 | 39 | import java.nio.charset.Charset; |
40 | 40 | import java.util.List; |
41 | +import java.util.Map; | |
41 | 42 | import java.util.Optional; |
43 | +import java.util.concurrent.ConcurrentMap; | |
42 | 44 | import java.util.concurrent.atomic.AtomicInteger; |
43 | 45 | |
44 | 46 | /** |
45 | 47 | * Created by ashvayka on 19.01.17. |
46 | 48 | */ |
47 | -public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { | |
49 | +public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { | |
48 | 50 | |
49 | 51 | private static final Gson GSON = new Gson(); |
50 | 52 | private static final Charset UTF8 = Charset.forName("UTF-8"); |
... | ... | @@ -56,8 +58,8 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { |
56 | 58 | private volatile boolean closed; |
57 | 59 | private AtomicInteger msgIdSeq = new AtomicInteger(0); |
58 | 60 | |
59 | - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { | |
60 | - super(parent.getProcessor(), parent.getAuthService(), device); | |
61 | + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) { | |
62 | + super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap); | |
61 | 63 | this.parent = parent; |
62 | 64 | this.sessionId = new MqttSessionId(); |
63 | 65 | } |
... | ... | @@ -195,7 +197,7 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { |
195 | 197 | |
196 | 198 | private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { |
197 | 199 | MqttFixedHeader mqttFixedHeader = |
198 | - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); | |
200 | + new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0); | |
199 | 201 | MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); |
200 | 202 | ByteBuf payload = ALLOCATOR.buffer(); |
201 | 203 | payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); | ... | ... |
... | ... | @@ -43,6 +43,7 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; |
43 | 43 | import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; |
44 | 44 | |
45 | 45 | import java.util.*; |
46 | +import java.util.concurrent.ConcurrentMap; | |
46 | 47 | import java.util.stream.Collectors; |
47 | 48 | |
48 | 49 | import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; |
... | ... | @@ -63,6 +64,7 @@ public class GatewaySessionCtx { |
63 | 64 | private final DeviceAuthService authService; |
64 | 65 | private final RelationService relationService; |
65 | 66 | private final Map<String, GatewayDeviceSessionCtx> devices; |
67 | + private final ConcurrentMap<String, Integer> mqttQoSMap; | |
66 | 68 | private ChannelHandlerContext channel; |
67 | 69 | |
68 | 70 | public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { |
... | ... | @@ -73,6 +75,7 @@ public class GatewaySessionCtx { |
73 | 75 | this.gateway = gatewaySessionCtx.getDevice(); |
74 | 76 | this.gatewaySessionId = gatewaySessionCtx.getSessionId(); |
75 | 77 | this.devices = new HashMap<>(); |
78 | + this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap(); | |
76 | 79 | } |
77 | 80 | |
78 | 81 | public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { |
... | ... | @@ -96,7 +99,7 @@ public class GatewaySessionCtx { |
96 | 99 | relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created")); |
97 | 100 | processor.onDeviceAdded(device); |
98 | 101 | } |
99 | - GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); | |
102 | + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap); | |
100 | 103 | devices.put(deviceName, ctx); |
101 | 104 | log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName); |
102 | 105 | processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.transport.mqtt.session; | |
17 | + | |
18 | +import io.netty.handler.codec.mqtt.MqttQoS; | |
19 | +import org.thingsboard.server.common.data.Device; | |
20 | +import org.thingsboard.server.common.transport.SessionMsgProcessor; | |
21 | +import org.thingsboard.server.common.transport.auth.DeviceAuthService; | |
22 | +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; | |
23 | + | |
24 | +import java.util.Map; | |
25 | +import java.util.concurrent.ConcurrentMap; | |
26 | + | |
27 | +/** | |
28 | + * Created by ashvayka on 30.08.18. | |
29 | + */ | |
30 | +public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { | |
31 | + | |
32 | + private final ConcurrentMap<String, Integer> mqttQoSMap; | |
33 | + | |
34 | + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) { | |
35 | + super(processor, authService); | |
36 | + this.mqttQoSMap = mqttQoSMap; | |
37 | + } | |
38 | + | |
39 | + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) { | |
40 | + super(processor, authService, device); | |
41 | + this.mqttQoSMap = mqttQoSMap; | |
42 | + } | |
43 | + | |
44 | + public ConcurrentMap<String, Integer> getMqttQoSMap() { | |
45 | + return mqttQoSMap; | |
46 | + } | |
47 | + | |
48 | + public MqttQoS getQoSForTopic(String topic) { | |
49 | + Integer qos = mqttQoSMap.get(topic); | |
50 | + if (qos != null) { | |
51 | + return MqttQoS.valueOf(qos); | |
52 | + } else { | |
53 | + return MqttQoS.AT_LEAST_ONCE; | |
54 | + } | |
55 | + } | |
56 | + | |
57 | +} | ... | ... |