Commit dbc7fe7abe543d1a1433300c781238dc7e5bc137

Authored by ShvaykaD
1 parent f14d2560

Fix Sub QoS topics handling

... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.mqtt.rpc;
17 17
18 18 import com.datastax.driver.core.utils.UUIDs;
  19 +import io.netty.handler.codec.mqtt.MqttQoS;
19 20 import lombok.extern.slf4j.Slf4j;
20 21 import org.apache.commons.lang3.StringUtils;
21 22 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
... ... @@ -23,19 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
23 24 import org.eclipse.paho.client.mqttv3.MqttCallback;
24 25 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
25 26 import org.eclipse.paho.client.mqttv3.MqttMessage;
26   -import org.junit.After;
27   -import org.junit.Assert;
28   -import org.junit.Before;
29   -import org.junit.Test;
  27 +import org.junit.*;
30 28 import org.thingsboard.server.common.data.Device;
31 29 import org.thingsboard.server.common.data.Tenant;
32 30 import org.thingsboard.server.common.data.User;
33 31 import org.thingsboard.server.common.data.security.Authority;
34 32 import org.thingsboard.server.common.data.security.DeviceCredentials;
35 33 import org.thingsboard.server.controller.AbstractControllerTest;
  34 +import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest;
36 35 import org.thingsboard.server.service.security.AccessValidator;
37 36
38 37 import java.util.Arrays;
  38 +import java.util.concurrent.CountDownLatch;
  39 +import java.util.concurrent.TimeUnit;
39 40
40 41 import static org.junit.Assert.assertEquals;
41 42 import static org.junit.Assert.assertNotNull;
... ... @@ -101,13 +102,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
101 102 MqttConnectOptions options = new MqttConnectOptions();
102 103 options.setUserName(accessToken);
103 104 client.connect(options).waitForCompletion();
104   - client.subscribe("v1/devices/me/rpc/request/+", 1);
105   - client.setCallback(new TestMqttCallback(client));
  105 +
  106 + TestMqttCallback callback = new TestMqttCallback(client);
  107 + client.setCallback(callback);
  108 + CountDownLatch latch = new CountDownLatch(1);
  109 + latch.countDown();
  110 + client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
106 111
107 112 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
108 113 String deviceId = savedDevice.getId().getId().toString();
109 114 String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk());
110 115 Assert.assertTrue(StringUtils.isEmpty(result));
  116 + latch.await(3, TimeUnit.SECONDS);
  117 + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
111 118 }
112 119
113 120 @Test
... ... @@ -204,11 +211,16 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
204 211 private static class TestMqttCallback implements MqttCallback {
205 212
206 213 private final MqttAsyncClient client;
  214 + private Integer qoS;
207 215
208 216 TestMqttCallback(MqttAsyncClient client) {
209 217 this.client = client;
210 218 }
211 219
  220 + int getQoS() {
  221 + return qoS;
  222 + }
  223 +
212 224 @Override
213 225 public void connectionLost(Throwable throwable) {
214 226 }
... ... @@ -219,6 +231,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
219 231 MqttMessage message = new MqttMessage();
220 232 String responseTopic = requestTopic.replace("request", "response");
221 233 message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
  234 + qoS = mqttMessage.getQos();
222 235 client.publish(responseTopic, message);
223 236 }
224 237
... ...
... ... @@ -15,10 +15,9 @@
15 15 */
16 16 package org.thingsboard.server.mqtt.telemetry;
17 17
  18 +import io.netty.handler.codec.mqtt.MqttQoS;
18 19 import lombok.extern.slf4j.Slf4j;
19   -import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
20   -import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
21   -import org.eclipse.paho.client.mqttv3.MqttMessage;
  20 +import org.eclipse.paho.client.mqttv3.*;
22 21 import org.junit.Before;
23 22 import org.junit.Ignore;
24 23 import org.junit.Test;
... ... @@ -30,9 +29,12 @@ import org.thingsboard.server.dao.service.DaoNoSqlTest;
30 29
31 30 import java.net.URI;
32 31 import java.util.*;
  32 +import java.util.concurrent.CountDownLatch;
  33 +import java.util.concurrent.TimeUnit;
33 34
34 35 import static org.junit.Assert.assertEquals;
35 36 import static org.junit.Assert.assertNotNull;
  37 +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
36 38
37 39 /**
38 40 * @author Valerii Sosliuk
... ... @@ -94,4 +96,60 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
94 96 assertEquals("3.0", values.get("key3").get(0).get("value"));
95 97 assertEquals("4", values.get("key4").get(0).get("value"));
96 98 }
  99 +
  100 + @Test
  101 + public void testMqttQoSLevel() throws Exception {
  102 + String clientId = MqttAsyncClient.generateClientId();
  103 + MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId);
  104 +
  105 + MqttConnectOptions options = new MqttConnectOptions();
  106 + options.setUserName(accessToken);
  107 + client.connect(options).waitForCompletion(3000);
  108 + TestMqttCallback callback = new TestMqttCallback(client);
  109 + client.setCallback(callback);
  110 + CountDownLatch latch = new CountDownLatch(1);
  111 + latch.countDown();
  112 + client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
  113 + String payload = "{\"key\":\"value\"}";
  114 + String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
  115 + latch.await(3, TimeUnit.SECONDS);
  116 + assertEquals(payload, callback.getPayload());
  117 + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
  118 + }
  119 +
  120 + private static class TestMqttCallback implements MqttCallback {
  121 +
  122 + private final MqttAsyncClient client;
  123 + private Integer qoS;
  124 + private String payload;
  125 +
  126 + String getPayload() {
  127 + return payload;
  128 + }
  129 +
  130 + TestMqttCallback(MqttAsyncClient client) {
  131 + this.client = client;
  132 + }
  133 +
  134 + int getQoS() {
  135 + return qoS;
  136 + }
  137 +
  138 + @Override
  139 + public void connectionLost(Throwable throwable) {
  140 + }
  141 +
  142 + @Override
  143 + public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
  144 + payload = new String(mqttMessage.getPayload());
  145 + qoS = mqttMessage.getQos();
  146 + }
  147 +
  148 + @Override
  149 + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  150 +
  151 + }
  152 + }
  153 +
  154 +
97 155 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.mqtt;
  17 +
  18 +import java.util.regex.Pattern;
  19 +
  20 +public class MqttTopicMatcher {
  21 +
  22 + private final String topic;
  23 + private final Pattern topicRegex;
  24 +
  25 + MqttTopicMatcher(String topic) {
  26 + if(topic == null){
  27 + throw new NullPointerException("topic");
  28 + }
  29 + this.topic = topic;
  30 + this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
  31 + }
  32 +
  33 + public String getTopic() {
  34 + return topic;
  35 + }
  36 +
  37 + public boolean matches(String topic){
  38 + return this.topicRegex.matcher(topic).matches();
  39 + }
  40 +
  41 + @Override
  42 + public boolean equals(Object o) {
  43 + if (this == o) return true;
  44 + if (o == null || getClass() != o.getClass()) return false;
  45 +
  46 + MqttTopicMatcher that = (MqttTopicMatcher) o;
  47 +
  48 + return topic.equals(that.topic);
  49 + }
  50 +
  51 + @Override
  52 + public int hashCode() {
  53 + return topic.hashCode();
  54 + }
  55 +}
... ...
... ... @@ -54,6 +54,7 @@ import java.util.List;
54 54 import java.util.Map;
55 55 import java.util.concurrent.ConcurrentHashMap;
56 56 import java.util.concurrent.ConcurrentMap;
  57 +import java.util.regex.Pattern;
57 58
58 59 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
59 60 import static io.netty.handler.codec.mqtt.MqttMessageType.*;
... ... @@ -78,7 +79,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
78 79 private final RelationService relationService;
79 80 private final QuotaService quotaService;
80 81 private final SslHandler sslHandler;
81   - private final ConcurrentMap<String, Integer> mqttQoSMap;
  82 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
82 83
83 84 private volatile boolean connected;
84 85 private volatile InetSocketAddress address;
... ... @@ -278,7 +279,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
278 279
279 280 private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
280 281 grantedQoSList.add(getMinSupportedQos(reqQoS));
281   - mqttQoSMap.put(topic, getMinSupportedQos(reqQoS));
  282 + mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
282 283 }
283 284
284 285 private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
... ... @@ -287,7 +288,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
287 288 }
288 289 log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
289 290 for (String topicName : mqttMsg.payload().topics()) {
290   - mqttQoSMap.remove(topicName);
  291 + mqttQoSMap.remove(new MqttTopicMatcher(topicName));
291 292 try {
292 293 switch (topicName) {
293 294 case DEVICE_ATTRIBUTES_TOPIC: {
... ...
... ... @@ -16,7 +16,10 @@
16 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 18 import io.netty.channel.ChannelHandlerContext;
19   -import io.netty.handler.codec.mqtt.*;
  19 +import io.netty.handler.codec.mqtt.MqttFixedHeader;
  20 +import io.netty.handler.codec.mqtt.MqttMessage;
  21 +import io.netty.handler.codec.mqtt.MqttMessageType;
  22 +import io.netty.handler.codec.mqtt.MqttQoS;
20 23 import lombok.extern.slf4j.Slf4j;
21 24 import org.thingsboard.server.common.data.id.SessionId;
22 25 import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg;
... ... @@ -27,10 +30,9 @@ import org.thingsboard.server.common.msg.session.ex.SessionException;
27 30 import org.thingsboard.server.common.transport.SessionMsgProcessor;
28 31 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
29 32 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
30   -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  33 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
31 34 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
32 35
33   -import java.util.Map;
34 36 import java.util.concurrent.ConcurrentMap;
35 37 import java.util.concurrent.atomic.AtomicInteger;
36 38
... ... @@ -46,7 +48,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
46 48 private volatile boolean allowAttributeResponses;
47 49 private AtomicInteger msgIdSeq = new AtomicInteger(0);
48 50
49   - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<String, Integer> mqttQoSMap) {
  51 + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
50 52 super(processor, authService, mqttQoSMap);
51 53 this.adaptor = adaptor;
52 54 this.sessionId = new MqttSessionId();
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.session.*;
33 33 import org.thingsboard.server.common.msg.session.ex.SessionException;
34 34 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
35 35 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  36 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
36 37 import org.thingsboard.server.transport.mqtt.MqttTopics;
37 38 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
38 39
... ... @@ -58,7 +59,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
58 59 private volatile boolean closed;
59 60 private AtomicInteger msgIdSeq = new AtomicInteger(0);
60 61
61   - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
  62 + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
62 63 super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap);
63 64 this.parent = parent;
64 65 this.sessionId = new MqttSessionId();
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
39 39 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
40 40 import org.thingsboard.server.dao.device.DeviceService;
41 41 import org.thingsboard.server.dao.relation.RelationService;
  42 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
42 43 import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
43 44 import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
44 45
... ... @@ -64,7 +65,7 @@ public class GatewaySessionCtx {
64 65 private final DeviceAuthService authService;
65 66 private final RelationService relationService;
66 67 private final Map<String, GatewayDeviceSessionCtx> devices;
67   - private final ConcurrentMap<String, Integer> mqttQoSMap;
  68 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
68 69 private ChannelHandlerContext channel;
69 70
70 71 public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
... ...
... ... @@ -20,35 +20,42 @@ import org.thingsboard.server.common.data.Device;
20 20 import org.thingsboard.server.common.transport.SessionMsgProcessor;
21 21 import org.thingsboard.server.common.transport.auth.DeviceAuthService;
22 22 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  23 +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher;
23 24
  25 +import java.util.List;
24 26 import java.util.Map;
25 27 import java.util.concurrent.ConcurrentMap;
  28 +import java.util.stream.Collectors;
26 29
27 30 /**
28 31 * Created by ashvayka on 30.08.18.
29 32 */
30 33 public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
31 34
32   - private final ConcurrentMap<String, Integer> mqttQoSMap;
  35 + private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
33 36
34   - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {
  37 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
35 38 super(processor, authService);
36 39 this.mqttQoSMap = mqttQoSMap;
37 40 }
38 41
39   - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
  42 + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
40 43 super(processor, authService, device);
41 44 this.mqttQoSMap = mqttQoSMap;
42 45 }
43 46
44   - public ConcurrentMap<String, Integer> getMqttQoSMap() {
  47 + public ConcurrentMap<MqttTopicMatcher, Integer> getMqttQoSMap() {
45 48 return mqttQoSMap;
46 49 }
47 50
48 51 public MqttQoS getQoSForTopic(String topic) {
49   - Integer qos = mqttQoSMap.get(topic);
50   - if (qos != null) {
51   - return MqttQoS.valueOf(qos);
  52 + List<Integer> qosList = mqttQoSMap.entrySet()
  53 + .stream()
  54 + .filter(entry -> entry.getKey().matches(topic))
  55 + .map(Map.Entry::getValue)
  56 + .collect(Collectors.toList());
  57 + if (!qosList.isEmpty()) {
  58 + return MqttQoS.valueOf(qosList.get(0));
52 59 } else {
53 60 return MqttQoS.AT_LEAST_ONCE;
54 61 }
... ...