Commit 1f31bd7d187e7347f204530bfbbbf0c7591aa016

Authored by Mitia Shvayka
1 parent 331548bf

refactoring mqtt

@@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate; @@ -51,6 +51,8 @@ import javax.security.cert.X509Certificate;
51 import java.net.InetSocketAddress; 51 import java.net.InetSocketAddress;
52 import java.util.ArrayList; 52 import java.util.ArrayList;
53 import java.util.List; 53 import java.util.List;
  54 +import java.util.Map;
  55 +import java.util.concurrent.ConcurrentHashMap;
54 56
55 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; 57 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
56 import static io.netty.handler.codec.mqtt.MqttMessageType.*; 58 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -79,6 +81,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
79 private volatile InetSocketAddress address; 81 private volatile InetSocketAddress address;
80 private volatile GatewaySessionCtx gatewaySessionCtx; 82 private volatile GatewaySessionCtx gatewaySessionCtx;
81 83
  84 + private Map<String,MqttQoS> mqttQoSMap = new ConcurrentHashMap<>();
  85 +
82 public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, 86 public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService,
83 MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) { 87 MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService) {
84 this.processor = processor; 88 this.processor = processor;
@@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -228,6 +232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
228 String topicName = subscription.topicName(); 232 String topicName = subscription.topicName();
229 //TODO: handle this qos level. 233 //TODO: handle this qos level.
230 MqttQoS reqQoS = subscription.qualityOfService(); 234 MqttQoS reqQoS = subscription.qualityOfService();
  235 + mqttQoSMap.put(topicName, reqQoS);
231 try { 236 try {
232 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { 237 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
233 AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); 238 AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, SUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);
@@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -244,6 +249,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
244 grantedQoSList.add(getMinSupportedQos(reqQoS)); 249 grantedQoSList.add(getMinSupportedQos(reqQoS));
245 } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { 250 } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) {
246 grantedQoSList.add(getMinSupportedQos(reqQoS)); 251 grantedQoSList.add(getMinSupportedQos(reqQoS));
  252 + }else if (topicName.equals(GATEWAY_RPC_TOPIC)) {
  253 + grantedQoSList.add(getMinSupportedQos(reqQoS));
247 } else { 254 } else {
248 log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); 255 log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS);
249 grantedQoSList.add(FAILURE.value()); 256 grantedQoSList.add(FAILURE.value());
@@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -262,6 +269,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
262 } 269 }
263 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); 270 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
264 for (String topicName : mqttMsg.payload().topics()) { 271 for (String topicName : mqttMsg.payload().topics()) {
  272 + mqttQoSMap.remove(topicName);
265 try { 273 try {
266 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { 274 if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
267 AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg); 275 AdaptorToSessionActorMsg msg = adaptor.convertToActorMsg(deviceSessionCtx, UNSUBSCRIBE_ATTRIBUTES_REQUEST, mqttMsg);