Commit aa58338cfa03e21247d1e963906a957173f1f4f8

Authored by Igor Kulikov
Committed by GitHub
2 parents f14d2560 dbc7fe7a

Merge pull request #1139 from ShvaykaD/develop/2.1.2

Fix Sub QoS topics handling
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.mqtt.rpc; 16 package org.thingsboard.server.mqtt.rpc;
17 17
18 import com.datastax.driver.core.utils.UUIDs; 18 import com.datastax.driver.core.utils.UUIDs;
  19 +import io.netty.handler.codec.mqtt.MqttQoS;
19 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
20 import org.apache.commons.lang3.StringUtils; 21 import org.apache.commons.lang3.StringUtils;
21 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 22 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -23,19 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttAsyncClient; @@ -23,19 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
23 import org.eclipse.paho.client.mqttv3.MqttCallback; 24 import org.eclipse.paho.client.mqttv3.MqttCallback;
24 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 25 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
25 import org.eclipse.paho.client.mqttv3.MqttMessage; 26 import org.eclipse.paho.client.mqttv3.MqttMessage;
26 -import org.junit.After;  
27 -import org.junit.Assert;  
28 -import org.junit.Before;  
29 -import org.junit.Test; 27 +import org.junit.*;
30 import org.thingsboard.server.common.data.Device; 28 import org.thingsboard.server.common.data.Device;
31 import org.thingsboard.server.common.data.Tenant; 29 import org.thingsboard.server.common.data.Tenant;
32 import org.thingsboard.server.common.data.User; 30 import org.thingsboard.server.common.data.User;
33 import org.thingsboard.server.common.data.security.Authority; 31 import org.thingsboard.server.common.data.security.Authority;
34 import org.thingsboard.server.common.data.security.DeviceCredentials; 32 import org.thingsboard.server.common.data.security.DeviceCredentials;
35 import org.thingsboard.server.controller.AbstractControllerTest; 33 import org.thingsboard.server.controller.AbstractControllerTest;
  34 +import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest;
36 import org.thingsboard.server.service.security.AccessValidator; 35 import org.thingsboard.server.service.security.AccessValidator;
37 36
38 import java.util.Arrays; 37 import java.util.Arrays;
  38 +import java.util.concurrent.CountDownLatch;
  39 +import java.util.concurrent.TimeUnit;
39 40
40 import static org.junit.Assert.assertEquals; 41 import static org.junit.Assert.assertEquals;
41 import static org.junit.Assert.assertNotNull; 42 import static org.junit.Assert.assertNotNull;
@@ -101,13 +102,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -101,13 +102,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
101 MqttConnectOptions options = new MqttConnectOptions(); 102 MqttConnectOptions options = new MqttConnectOptions();
102 options.setUserName(accessToken); 103 options.setUserName(accessToken);
103 client.connect(options).waitForCompletion(); 104 client.connect(options).waitForCompletion();
104 - client.subscribe("v1/devices/me/rpc/request/+", 1);  
105 - client.setCallback(new TestMqttCallback(client)); 105 +
  106 + TestMqttCallback callback = new TestMqttCallback(client);
  107 + client.setCallback(callback);
  108 + CountDownLatch latch = new CountDownLatch(1);
  109 + latch.countDown();
  110 + client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
106 111
107 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 112 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
108 String deviceId = savedDevice.getId().getId().toString(); 113 String deviceId = savedDevice.getId().getId().toString();
109 String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); 114 String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
110 Assert.assertTrue(StringUtils.isEmpty(result)); 115 Assert.assertTrue(StringUtils.isEmpty(result));
  116 + latch.await(3, TimeUnit.SECONDS);
  117 + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
111 } 118 }
112 119
113 @Test 120 @Test
@@ -204,11 +211,16 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -204,11 +211,16 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
204 private static class TestMqttCallback implements MqttCallback { 211 private static class TestMqttCallback implements MqttCallback {
205 212
206 private final MqttAsyncClient client; 213 private final MqttAsyncClient client;
  214 + private Integer qoS;
207 215
208 TestMqttCallback(MqttAsyncClient client) { 216 TestMqttCallback(MqttAsyncClient client) {
209 this.client = client; 217 this.client = client;
210 } 218 }
211 219
  220 + int getQoS() {
  221 + return qoS;
  222 + }
  223 +
212 @Override 224 @Override
213 public void connectionLost(Throwable throwable) { 225 public void connectionLost(Throwable throwable) {
214 } 226 }
@@ -219,6 +231,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -219,6 +231,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
219 MqttMessage message = new MqttMessage(); 231 MqttMessage message = new MqttMessage();
220 String responseTopic = requestTopic.replace("request", "response"); 232 String responseTopic = requestTopic.replace("request", "response");
221 message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); 233 message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
  234 + qoS = mqttMessage.getQos();
222 client.publish(responseTopic, message); 235 client.publish(responseTopic, message);
223 } 236 }
224 237
@@ -15,10 +15,9 @@ @@ -15,10 +15,9 @@
15 */ 15 */
16 package org.thingsboard.server.mqtt.telemetry; 16 package org.thingsboard.server.mqtt.telemetry;
17 17
  18 +import io.netty.handler.codec.mqtt.MqttQoS;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
19 -import org.eclipse.paho.client.mqttv3.MqttAsyncClient;  
20 -import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
21 -import org.eclipse.paho.client.mqttv3.MqttMessage; 20 +import org.eclipse.paho.client.mqttv3.*;
22 import org.junit.Before; 21 import org.junit.Before;
23 import org.junit.Ignore; 22 import org.junit.Ignore;
24 import org.junit.Test; 23 import org.junit.Test;
@@ -30,9 +29,12 @@ import org.thingsboard.server.dao.service.DaoNoSqlTest; @@ -30,9 +29,12 @@ import org.thingsboard.server.dao.service.DaoNoSqlTest;
30 29
31 import java.net.URI; 30 import java.net.URI;
32 import java.util.*; 31 import java.util.*;
  32 +import java.util.concurrent.CountDownLatch;
  33 +import java.util.concurrent.TimeUnit;
33 34
34 import static org.junit.Assert.assertEquals; 35 import static org.junit.Assert.assertEquals;
35 import static org.junit.Assert.assertNotNull; 36 import static org.junit.Assert.assertNotNull;
  37 +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
36 38
37 /** 39 /**
38 * @author Valerii Sosliuk 40 * @author Valerii Sosliuk
@@ -94,4 +96,60 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -94,4 +96,60 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
94 assertEquals("3.0", values.get("key3").get(0).get("value")); 96 assertEquals("3.0", values.get("key3").get(0).get("value"));
95 assertEquals("4", values.get("key4").get(0).get("value")); 97 assertEquals("4", values.get("key4").get(0).get("value"));
96 } 98 }
  99 +
  100 + @Test
  101 + public void testMqttQoSLevel() throws Exception {
  102 + String clientId = MqttAsyncClient.generateClientId();
  103 + MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId);
  104 +
  105 + MqttConnectOptions options = new MqttConnectOptions();
  106 + options.setUserName(accessToken);
  107 + client.connect(options).waitForCompletion(3000);
  108 + TestMqttCallback callback = new TestMqttCallback(client);
  109 + client.setCallback(callback);
  110 + CountDownLatch latch = new CountDownLatch(1);
  111 + latch.countDown();
  112 + client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
  113 + String payload = "{\"key\":\"value\"}";
  114 + String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
  115 + latch.await(3, TimeUnit.SECONDS);
  116 + assertEquals(payload, callback.getPayload());
  117 + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
  118 + }
  119 +
  120 + private static class TestMqttCallback implements MqttCallback {
  121 +
  122 + private final MqttAsyncClient client;
  123 + private Integer qoS;
  124 + private String payload;
  125 +
  126 + String getPayload() {
  127 + return payload;
  128 + }
  129 +
  130 + TestMqttCallback(MqttAsyncClient client) {
  131 + this.client = client;
  132 + }
  133 +
  134 + int getQoS() {
  135 + return qoS;
  136 + }
  137 +
  138 + @Override
  139 + public void connectionLost(Throwable throwable) {
  140 + }
  141 +
  142 + @Override
  143 + public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
  144 + payload = new String(mqttMessage.getPayload());
  145 + qoS = mqttMessage.getQos();
  146 + }
  147 +
  148 + @Override
  149 + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  150 +
  151 + }
  152 + }
  153 +
  154 +
97 } 155 }
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.mqtt;
  17 +
  18 +import java.util.regex.Pattern;
  19 +
  20 +public class MqttTopicMatcher {
  21 +
  22 + private final String topic;
  23 + private final Pattern topicRegex;
  24 +
  25 + MqttTopicMatcher(String topic) {
  26 + if(topic == null){
  27 + throw new NullPointerException("topic");
  28 + }
  29 + this.topic = topic;
  30 + this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
  31 + }
  32 +
  33 + public String getTopic() {
  34 + return topic;
  35 + }
  36 +
  37 + public boolean matches(String topic){
  38 + return this.topicRegex.matcher(topic).matches();
  39 + }
  40 +
  41 + @Override
  42 + public boolean equals(Object o) {
  43 + if (this == o) return true;
  44 + if (o == null || getClass() != o.getClass()) return false;
  45 +
  46 + MqttTopicMatcher that = (MqttTopicMatcher) o;
  47 +
  48 + return topic.equals(that.topic);
  49 + }
  50 +
  51 + @Override
  52 + public int hashCode() {
  53 + return topic.hashCode();
  54 + }
  55 +}
@@ -54,6 +54,7 @@ import java.util.List; @@ -54,6 +54,7 @@ import java.util.List;
54 import java.util.Map; 54 import java.util.Map;
55 import java.util.concurrent.ConcurrentHashMap; 55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.ConcurrentMap; 56 import java.util.concurrent.ConcurrentMap;
  57 +import java.util.regex.Pattern;
57 58
58 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; 59 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
59 import static io.netty.handler.codec.mqtt.MqttMessageType.*; 60 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
@@ -78,7 +79,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -78,7 +79,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
78 private final RelationService relationService; 79 private final RelationService relationService;
79 private final QuotaService quotaService; 80 private final QuotaService quotaService;
80 private final SslHandler sslHandler; 81 private final SslHandler sslHandler;
81 - private final ConcurrentMap<String, Integer> mqttQoSMap; 82 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
82 83
83 private volatile boolean connected; 84 private volatile boolean connected;
84 private volatile InetSocketAddress address; 85 private volatile InetSocketAddress address;
@@ -278,7 +279,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -278,7 +279,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
278 279
279 private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) { 280 private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
280 grantedQoSList.add(getMinSupportedQos(reqQoS)); 281 grantedQoSList.add(getMinSupportedQos(reqQoS));
281 - mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); 282 + mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
282 } 283 }
283 284
284 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { 285 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
@@ -287,7 +288,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -287,7 +288,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
287 } 288 }
288 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); 289 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
289 for (String topicName : mqttMsg.payload().topics()) { 290 for (String topicName : mqttMsg.payload().topics()) {
290 - mqttQoSMap.remove(topicName); 291 + mqttQoSMap.remove(new MqttTopicMatcher(topicName));
291 try { 292 try {
292 switch (topicName) { 293 switch (topicName) {
293 case DEVICE_ATTRIBUTES_TOPIC: { 294 case DEVICE_ATTRIBUTES_TOPIC: {
@@ -16,7 +16,10 @@ @@ -16,7 +16,10 @@
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 import io.netty.channel.ChannelHandlerContext; 18 import io.netty.channel.ChannelHandlerContext;
19 -import io.netty.handler.codec.mqtt.*; 19 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  20 +import io.netty.handler.codec.mqtt.MqttMessage;
  21 +import io.netty.handler.codec.mqtt.MqttMessageType;
  22 +import io.netty.handler.codec.mqtt.MqttQoS;
20 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
21 import org.thingsboard.server.common.data.id.SessionId; 24 import org.thingsboard.server.common.data.id.SessionId;
22 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; 25 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
@@ -27,10 +30,9 @@ import org.thingsboard.server.common.msg.session.ex.SessionException; @@ -27,10 +30,9 @@ import org.thingsboard.server.common.msg.session.ex.SessionException;
27 import org.thingsboard.server.common.transport.SessionMsgProcessor; 30 import org.thingsboard.server.common.transport.SessionMsgProcessor;
28 import org.thingsboard.server.common.transport.adaptor.AdaptorException; 31 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
29 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 32 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
30 -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 33 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
31 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; 34 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
32 35
33 -import java.util.Map;  
34 import java.util.concurrent.ConcurrentMap; 36 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.atomic.AtomicInteger; 37 import java.util.concurrent.atomic.AtomicInteger;
36 38
@@ -46,7 +48,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -46,7 +48,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
46 private volatile boolean allowAttributeResponses; 48 private volatile boolean allowAttributeResponses;
47 private AtomicInteger msgIdSeq = new AtomicInteger(0); 49 private AtomicInteger msgIdSeq = new AtomicInteger(0);
48 50
49 - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) { 51 + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
50 super(processor, authService, mqttQoSMap); 52 super(processor, authService, mqttQoSMap);
51 this.adaptor = adaptor; 53 this.adaptor = adaptor;
52 this.sessionId = new MqttSessionId(); 54 this.sessionId = new MqttSessionId();
@@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.session.*; @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.session.*;
33 import org.thingsboard.server.common.msg.session.ex.SessionException; 33 import org.thingsboard.server.common.msg.session.ex.SessionException;
34 import org.thingsboard.server.common.transport.adaptor.JsonConverter; 34 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
35 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 35 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  36 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
36 import org.thingsboard.server.transport.mqtt.MqttTopics; 37 import org.thingsboard.server.transport.mqtt.MqttTopics;
37 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 38 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
38 39
@@ -58,7 +59,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { @@ -58,7 +59,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
58 private volatile boolean closed; 59 private volatile boolean closed;
59 private AtomicInteger msgIdSeq = new AtomicInteger(0); 60 private AtomicInteger msgIdSeq = new AtomicInteger(0);
60 61
61 - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) { 62 + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
62 super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap); 63 super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap);
63 this.parent = parent; 64 this.parent = parent;
64 this.sessionId = new MqttSessionId(); 65 this.sessionId = new MqttSessionId();
@@ -39,6 +39,7 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -39,6 +39,7 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
39 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 39 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
40 import org.thingsboard.server.dao.device.DeviceService; 40 import org.thingsboard.server.dao.device.DeviceService;
41 import org.thingsboard.server.dao.relation.RelationService; 41 import org.thingsboard.server.dao.relation.RelationService;
  42 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
42 import org.thingsboard.server.transport.mqtt.MqttTransportHandler; 43 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
43 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; 44 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
44 45
@@ -64,7 +65,7 @@ public class GatewaySessionCtx { @@ -64,7 +65,7 @@ public class GatewaySessionCtx {
64 private final DeviceAuthService authService; 65 private final DeviceAuthService authService;
65 private final RelationService relationService; 66 private final RelationService relationService;
66 private final Map<String, GatewayDeviceSessionCtx> devices; 67 private final Map<String, GatewayDeviceSessionCtx> devices;
67 - private final ConcurrentMap<String, Integer> mqttQoSMap; 68 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
68 private ChannelHandlerContext channel; 69 private ChannelHandlerContext channel;
69 70
70 public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { 71 public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
@@ -20,35 +20,42 @@ import org.thingsboard.server.common.data.Device; @@ -20,35 +20,42 @@ import org.thingsboard.server.common.data.Device;
20 import org.thingsboard.server.common.transport.SessionMsgProcessor; 20 import org.thingsboard.server.common.transport.SessionMsgProcessor;
21 import org.thingsboard.server.common.transport.auth.DeviceAuthService; 21 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
22 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 22 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  23 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
23 24
  25 +import java.util.List;
24 import java.util.Map; 26 import java.util.Map;
25 import java.util.concurrent.ConcurrentMap; 27 import java.util.concurrent.ConcurrentMap;
  28 +import java.util.stream.Collectors;
26 29
27 /** 30 /**
28 * Created by ashvayka on 30.08.18. 31 * Created by ashvayka on 30.08.18.
29 */ 32 */
30 public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { 33 public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
31 34
32 - private final ConcurrentMap<String, Integer> mqttQoSMap; 35 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
33 36
34 - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) { 37 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
35 super(processor, authService); 38 super(processor, authService);
36 this.mqttQoSMap = mqttQoSMap; 39 this.mqttQoSMap = mqttQoSMap;
37 } 40 }
38 41
39 - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) { 42 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
40 super(processor, authService, device); 43 super(processor, authService, device);
41 this.mqttQoSMap = mqttQoSMap; 44 this.mqttQoSMap = mqttQoSMap;
42 } 45 }
43 46
44 - public ConcurrentMap<String, Integer> getMqttQoSMap() { 47 + public ConcurrentMap<MqttTopicMatcher, Integer> getMqttQoSMap() {
45 return mqttQoSMap; 48 return mqttQoSMap;
46 } 49 }
47 50
48 public MqttQoS getQoSForTopic(String topic) { 51 public MqttQoS getQoSForTopic(String topic) {
49 - Integer qos = mqttQoSMap.get(topic);  
50 - if (qos != null) {  
51 - return MqttQoS.valueOf(qos); 52 + List<Integer> qosList = mqttQoSMap.entrySet()
  53 + .stream()
  54 + .filter(entry -> entry.getKey().matches(topic))
  55 + .map(Map.Entry::getValue)
  56 + .collect(Collectors.toList());
  57 + if (!qosList.isEmpty()) {
  58 + return MqttQoS.valueOf(qosList.get(0));
52 } else { 59 } else {
53 return MqttQoS.AT_LEAST_ONCE; 60 return MqttQoS.AT_LEAST_ONCE;
54 } 61 }