Commit 516d8791fdfaad0a04ff38bed7602f877f37a580
Committed by
GitHub
Merge pull request #1047 from ShvaykaD/develop/2.0
refactoring mqtt
Showing
1 changed file
with
8 additions
and
0 deletions
@@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate; | @@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate; | ||
51 | import java.net.InetSocketAddress; | 51 | import java.net.InetSocketAddress; |
52 | import java.util.ArrayList; | 52 | import java.util.ArrayList; |
53 | import java.util.List; | 53 | import java.util.List; |
54 | +import java.util.Map; | ||
55 | +import java.util.concurrent.ConcurrentHashMap; | ||
54 | 56 | ||
55 | import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; | 57 | import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; |
56 | import static io.netty.handler.codec.mqtt.MqttMessageType.*; | 58 | import static io.netty.handler.codec.mqtt.MqttMessageType.*; |
@@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | @@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | ||
79 | private volatile InetSocketAddress address; | 81 | private volatile InetSocketAddress address; |
80 | private volatile GatewaySessionCtx gatewaySessionCtx; | 82 | private volatile GatewaySessionCtx gatewaySessionCtx; |
81 | 83 | ||
84 | + private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>(); | ||
85 | + | ||
82 | public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, | 86 | public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, |
83 | MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { | 87 | MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { |
84 | this.processor = processor; | 88 | this.processor = processor; |
@@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | @@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | ||
228 | String topicName = subscription.topicName(); | 232 | String topicName = subscription.topicName(); |
229 | //TODO: handle this qos level. | 233 | //TODO: handle this qos level. |
230 | MqttQoS reqQoS = subscription.qualityOfService(); | 234 | MqttQoS reqQoS = subscription.qualityOfService(); |
235 | + mqttQoSMap.put(topicName, reqQoS); | ||
231 | try { | 236 | try { |
232 | if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { | 237 | if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { |
233 | AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | 238 | AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); |
@@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | @@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | ||
244 | grantedQoSList.add(getMinSupportedQos(reqQoS)); | 249 | grantedQoSList.add(getMinSupportedQos(reqQoS)); |
245 | } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { | 250 | } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { |
246 | grantedQoSList.add(getMinSupportedQos(reqQoS)); | 251 | grantedQoSList.add(getMinSupportedQos(reqQoS)); |
252 | + }else if (topicName.equals(GATEWAY_RPC_TOPIC)) { | ||
253 | + grantedQoSList.add(getMinSupportedQos(reqQoS)); | ||
247 | } else { | 254 | } else { |
248 | log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); | 255 | log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); |
249 | grantedQoSList.add(FAILURE.value()); | 256 | grantedQoSList.add(FAILURE.value()); |
@@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | @@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement | ||
262 | } | 269 | } |
263 | log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); | 270 | log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); |
264 | for (String topicName : mqttMsg.payload().topics()) { | 271 | for (String topicName : mqttMsg.payload().topics()) { |
272 | + mqttQoSMap.remove(topicName); | ||
265 | try { | 273 | try { |
266 | if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { | 274 | if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { |
267 | AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); | 275 | AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); |