Showing
17 changed files
with
299 additions
and
122 deletions
... | ... | @@ -23,8 +23,6 @@ public class MqttDeviceProfileTransportConfiguration implements DeviceProfileTra |
23 | 23 | |
24 | 24 | private String deviceTelemetryTopic = MqttTopics.DEVICE_TELEMETRY_TOPIC; |
25 | 25 | private String deviceAttributesTopic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC; |
26 | - private String deviceRpcRequestTopic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; | |
27 | - private String deviceRpcResponseTopic = MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; | |
28 | 26 | |
29 | 27 | @Override |
30 | 28 | public DeviceTransportType getType() { | ... | ... |
... | ... | @@ -15,6 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.msg.session; |
17 | 17 | |
18 | +import org.thingsboard.server.common.data.DeviceProfile; | |
19 | + | |
18 | 20 | import java.util.UUID; |
19 | 21 | |
20 | 22 | public interface SessionContext { |
... | ... | @@ -22,4 +24,6 @@ public interface SessionContext { |
22 | 24 | UUID getSessionId(); |
23 | 25 | |
24 | 26 | int nextMsgId(); |
27 | + | |
28 | + void onProfileUpdate(DeviceProfile deviceProfile); | |
25 | 29 | } | ... | ... |
... | ... | @@ -99,9 +99,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
99 | 99 | private final SslHandler sslHandler; |
100 | 100 | private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; |
101 | 101 | |
102 | - private volatile SessionInfoProto sessionInfo; | |
102 | + private final DeviceSessionCtx deviceSessionCtx; | |
103 | 103 | private volatile InetSocketAddress address; |
104 | - private volatile DeviceSessionCtx deviceSessionCtx; | |
105 | 104 | private volatile GatewaySessionHandler gatewaySessionHandler; |
106 | 105 | |
107 | 106 | MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) { |
... | ... | @@ -152,7 +151,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
152 | 151 | case PINGREQ: |
153 | 152 | if (checkConnected(ctx, msg)) { |
154 | 153 | ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); |
155 | - transportService.reportActivity(sessionInfo); | |
154 | + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); | |
156 | 155 | } |
157 | 156 | break; |
158 | 157 | case DISCONNECT: |
... | ... | @@ -176,7 +175,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
176 | 175 | if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { |
177 | 176 | if (gatewaySessionHandler != null) { |
178 | 177 | handleGatewayPublishMsg(topicName, msgId, mqttMsg); |
179 | - transportService.reportActivity(sessionInfo); | |
178 | + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); | |
180 | 179 | } |
181 | 180 | } else { |
182 | 181 | processDevicePublish(ctx, mqttMsg, topicName, msgId); |
... | ... | @@ -215,26 +214,26 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
215 | 214 | |
216 | 215 | private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { |
217 | 216 | try { |
218 | - if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { | |
217 | + if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { | |
219 | 218 | TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); |
220 | - transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); | |
221 | - } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { | |
219 | + transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); | |
220 | + } else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { | |
222 | 221 | TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); |
223 | - transportService.process(sessionInfo, postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); | |
222 | + transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); | |
224 | 223 | } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { |
225 | 224 | TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg); |
226 | - transportService.process(sessionInfo, getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); | |
225 | + transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); | |
227 | 226 | } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) { |
228 | 227 | TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg); |
229 | - transportService.process(sessionInfo, rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); | |
228 | + transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); | |
230 | 229 | } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { |
231 | 230 | TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg); |
232 | - transportService.process(sessionInfo, rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); | |
231 | + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); | |
233 | 232 | } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { |
234 | 233 | TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); |
235 | - transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); | |
234 | + transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); | |
236 | 235 | } else { |
237 | - transportService.reportActivity(sessionInfo); | |
236 | + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); | |
238 | 237 | } |
239 | 238 | } catch (AdaptorException e) { |
240 | 239 | log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); |
... | ... | @@ -274,13 +273,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
274 | 273 | try { |
275 | 274 | switch (topic) { |
276 | 275 | case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { |
277 | - transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); | |
276 | + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); | |
278 | 277 | registerSubQoS(topic, grantedQoSList, reqQoS); |
279 | 278 | activityReported = true; |
280 | 279 | break; |
281 | 280 | } |
282 | 281 | case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { |
283 | - transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); | |
282 | + transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); | |
284 | 283 | registerSubQoS(topic, grantedQoSList, reqQoS); |
285 | 284 | activityReported = true; |
286 | 285 | break; |
... | ... | @@ -303,7 +302,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
303 | 302 | } |
304 | 303 | } |
305 | 304 | if (!activityReported) { |
306 | - transportService.reportActivity(sessionInfo); | |
305 | + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); | |
307 | 306 | } |
308 | 307 | ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); |
309 | 308 | } |
... | ... | @@ -324,12 +323,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
324 | 323 | try { |
325 | 324 | switch (topicName) { |
326 | 325 | case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { |
327 | - transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null); | |
326 | + transportService.process(deviceSessionCtx.getSessionInfo(), | |
327 | + TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null); | |
328 | 328 | activityReported = true; |
329 | 329 | break; |
330 | 330 | } |
331 | 331 | case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { |
332 | - transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null); | |
332 | + transportService.process(deviceSessionCtx.getSessionInfo(), | |
333 | + TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null); | |
333 | 334 | activityReported = true; |
334 | 335 | break; |
335 | 336 | } |
... | ... | @@ -339,7 +340,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
339 | 340 | } |
340 | 341 | } |
341 | 342 | if (!activityReported) { |
342 | - transportService.reportActivity(sessionInfo); | |
343 | + transportService.reportActivity(deviceSessionCtx.getSessionInfo()); | |
343 | 344 | } |
344 | 345 | ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); |
345 | 346 | } |
... | ... | @@ -499,8 +500,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
499 | 500 | |
500 | 501 | private void doDisconnect() { |
501 | 502 | if (deviceSessionCtx.isConnected()) { |
502 | - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); | |
503 | - transportService.deregisterSession(sessionInfo); | |
503 | + transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); | |
504 | + transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); | |
504 | 505 | if (gatewaySessionHandler != null) { |
505 | 506 | gatewaySessionHandler.onGatewayDisconnect(); |
506 | 507 | } |
... | ... | @@ -515,11 +516,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
515 | 516 | } else { |
516 | 517 | deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); |
517 | 518 | deviceSessionCtx.setDeviceProfile(msg.getDeviceProfile()); |
518 | - sessionInfo = SessionInfoCreator.create(msg, context, sessionId); | |
519 | - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { | |
519 | + deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId)); | |
520 | + transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { | |
520 | 521 | @Override |
521 | 522 | public void onSuccess(Void msg) { |
522 | - transportService.registerAsyncSession(sessionInfo, MqttTransportHandler.this); | |
523 | + transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); | |
523 | 524 | checkGatewaySession(); |
524 | 525 | ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); |
525 | 526 | log.info("[{}] Client connected!", sessionId); |
... | ... | @@ -581,7 +582,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement |
581 | 582 | |
582 | 583 | @Override |
583 | 584 | public void onProfileUpdate(DeviceProfile deviceProfile) { |
584 | - deviceSessionCtx.getDeviceInfo().setDeviceType(deviceProfile.getName()); | |
585 | - sessionInfo = SessionInfoProto.newBuilder().mergeFrom(sessionInfo).setDeviceType(deviceProfile.getName()).build(); | |
585 | + deviceSessionCtx.onProfileUpdate(deviceProfile); | |
586 | 586 | } |
587 | 587 | } | ... | ... |
... | ... | @@ -18,6 +18,13 @@ package org.thingsboard.server.transport.mqtt.session; |
18 | 18 | import io.netty.channel.ChannelHandlerContext; |
19 | 19 | import lombok.Getter; |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | +import org.thingsboard.server.common.data.DeviceProfile; | |
22 | +import org.thingsboard.server.common.data.DeviceTransportType; | |
23 | +import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; | |
24 | +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; | |
25 | +import org.thingsboard.server.common.data.device.profile.MqttTopics; | |
26 | +import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter; | |
27 | +import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory; | |
21 | 28 | |
22 | 29 | import java.util.UUID; |
23 | 30 | import java.util.concurrent.ConcurrentMap; |
... | ... | @@ -31,7 +38,11 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { |
31 | 38 | |
32 | 39 | @Getter |
33 | 40 | private ChannelHandlerContext channel; |
34 | - private AtomicInteger msgIdSeq = new AtomicInteger(0); | |
41 | + private final AtomicInteger msgIdSeq = new AtomicInteger(0); | |
42 | + | |
43 | + private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter(); | |
44 | + private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); | |
45 | + | |
35 | 46 | |
36 | 47 | public DeviceSessionCtx(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { |
37 | 48 | super(sessionId, mqttQoSMap); |
... | ... | @@ -44,4 +55,37 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { |
44 | 55 | public int nextMsgId() { |
45 | 56 | return msgIdSeq.incrementAndGet(); |
46 | 57 | } |
58 | + | |
59 | + public boolean isDeviceTelemetryTopic(String topicName) { | |
60 | + return telemetryTopicFilter.filter(topicName); | |
61 | + } | |
62 | + | |
63 | + public boolean isDeviceAttributesTopic(String topicName) { | |
64 | + return attributesTopicFilter.filter(topicName); | |
65 | + } | |
66 | + | |
67 | + @Override | |
68 | + public void setDeviceProfile(DeviceProfile deviceProfile) { | |
69 | + super.setDeviceProfile(deviceProfile); | |
70 | + updateTopicFilters(deviceProfile); | |
71 | + } | |
72 | + | |
73 | + @Override | |
74 | + public void onProfileUpdate(DeviceProfile deviceProfile) { | |
75 | + super.onProfileUpdate(deviceProfile); | |
76 | + updateTopicFilters(deviceProfile); | |
77 | + } | |
78 | + | |
79 | + private void updateTopicFilters(DeviceProfile deviceProfile) { | |
80 | + DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); | |
81 | + if (transportConfiguration.getType().equals(DeviceTransportType.MQTT) && | |
82 | + transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) { | |
83 | + MqttDeviceProfileTransportConfiguration mqttConfig = (MqttDeviceProfileTransportConfiguration) transportConfiguration; | |
84 | + telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic()); | |
85 | + attributesTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic()); | |
86 | + } else { | |
87 | + telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter(); | |
88 | + attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); | |
89 | + } | |
90 | + } | |
47 | 91 | } | ... | ... |
... | ... | @@ -33,12 +33,12 @@ import java.util.concurrent.ConcurrentMap; |
33 | 33 | public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { |
34 | 34 | |
35 | 35 | private final GatewaySessionHandler parent; |
36 | - private volatile SessionInfoProto sessionInfo; | |
37 | 36 | |
38 | - public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { | |
37 | + public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo, | |
38 | + DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) { | |
39 | 39 | super(UUID.randomUUID(), mqttQoSMap); |
40 | 40 | this.parent = parent; |
41 | - this.sessionInfo = SessionInfoProto.newBuilder() | |
41 | + setSessionInfo(SessionInfoProto.newBuilder() | |
42 | 42 | .setNodeId(parent.getNodeId()) |
43 | 43 | .setSessionIdMSB(sessionId.getMostSignificantBits()) |
44 | 44 | .setSessionIdLSB(sessionId.getLeastSignificantBits()) |
... | ... | @@ -52,7 +52,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple |
52 | 52 | .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits()) |
53 | 53 | .setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits()) |
54 | 54 | .setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits()) |
55 | - .build(); | |
55 | + .build()); | |
56 | 56 | setDeviceInfo(deviceInfo); |
57 | 57 | setDeviceProfile(deviceProfile); |
58 | 58 | } |
... | ... | @@ -67,10 +67,6 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple |
67 | 67 | return parent.nextMsgId(); |
68 | 68 | } |
69 | 69 | |
70 | - SessionInfoProto getSessionInfo() { | |
71 | - return sessionInfo; | |
72 | - } | |
73 | - | |
74 | 70 | @Override |
75 | 71 | public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { |
76 | 72 | try { |
... | ... | @@ -107,10 +103,4 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple |
107 | 103 | public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { |
108 | 104 | // This feature is not supported in the TB IoT Gateway yet. |
109 | 105 | } |
110 | - | |
111 | - @Override | |
112 | - public void onProfileUpdate(DeviceProfile deviceProfile) { | |
113 | - deviceInfo.setDeviceType(deviceProfile.getName()); | |
114 | - sessionInfo = SessionInfoProto.newBuilder().mergeFrom(sessionInfo).setDeviceType(deviceProfile.getName()).build(); | |
115 | - } | |
116 | 106 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.transport.mqtt.util; | |
17 | + | |
18 | +import lombok.Data; | |
19 | + | |
20 | +@Data | |
21 | +public class EqualsTopicFilter implements MqttTopicFilter { | |
22 | + | |
23 | + private final String filter; | |
24 | + | |
25 | + @Override | |
26 | + public boolean filter(String topic) { | |
27 | + return filter.equals(topic); | |
28 | + } | |
29 | +} | ... | ... |
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilter.java
renamed from
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicRegexUtil.java
... | ... | @@ -15,20 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.mqtt.util; |
17 | 17 | |
18 | -import lombok.extern.slf4j.Slf4j; | |
18 | +public interface MqttTopicFilter { | |
19 | 19 | |
20 | -import java.util.regex.Pattern; | |
21 | - | |
22 | -@Slf4j | |
23 | -public class MqttTopicRegexUtil { | |
24 | - | |
25 | - public static Pattern toRegex(String topicFilter) { | |
26 | - String regex = topicFilter | |
27 | - .replace("\\", "\\\\") | |
28 | - .replace("+", "[^/]+") | |
29 | - .replace("/#", "($|/.*)"); | |
30 | - log.debug("Converting [{}] to [{}]", topicFilter, regex); | |
31 | - return Pattern.compile(regex); | |
32 | - } | |
20 | + boolean filter(String topic); | |
33 | 21 | |
34 | 22 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.transport.mqtt.util; | |
17 | + | |
18 | +import lombok.extern.slf4j.Slf4j; | |
19 | +import org.thingsboard.server.common.data.device.profile.MqttTopics; | |
20 | + | |
21 | +import java.util.concurrent.ConcurrentHashMap; | |
22 | +import java.util.concurrent.ConcurrentMap; | |
23 | +import java.util.regex.Pattern; | |
24 | + | |
25 | +@Slf4j | |
26 | +public class MqttTopicFilterFactory { | |
27 | + | |
28 | + private static final ConcurrentMap<String, MqttTopicFilter> filters = new ConcurrentHashMap<>(); | |
29 | + private static final MqttTopicFilter DEFAULT_TELEMETRY_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_TELEMETRY_TOPIC); | |
30 | + private static final MqttTopicFilter DEFAULT_ATTRIBUTES_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_ATTRIBUTES_TOPIC); | |
31 | + | |
32 | + public static MqttTopicFilter toFilter(String topicFilter) { | |
33 | + if (topicFilter == null || topicFilter.isEmpty()) { | |
34 | + throw new IllegalArgumentException("Topic filter can't be empty!"); | |
35 | + } | |
36 | + return filters.computeIfAbsent(topicFilter, filter -> { | |
37 | + if (filter.contains("+") || filter.contains("#")) { | |
38 | + String regex = filter | |
39 | + .replace("\\", "\\\\") | |
40 | + .replace("+", "[^/]+") | |
41 | + .replace("/#", "($|/.*)"); | |
42 | + log.debug("Converting [{}] to [{}]", filter, regex); | |
43 | + return new RegexTopicFilter(regex); | |
44 | + } else { | |
45 | + return new EqualsTopicFilter(filter); | |
46 | + } | |
47 | + }); | |
48 | + } | |
49 | + | |
50 | + public static MqttTopicFilter getDefaultTelemetryFilter() { | |
51 | + return DEFAULT_TELEMETRY_TOPIC_FILTER; | |
52 | + } | |
53 | + | |
54 | + public static MqttTopicFilter getDefaultAttributesFilter() { | |
55 | + return DEFAULT_ATTRIBUTES_TOPIC_FILTER; | |
56 | + } | |
57 | +} | ... | ... |
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/RegexTopicFilter.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.transport.mqtt.util; | |
17 | + | |
18 | +import lombok.Data; | |
19 | + | |
20 | +import java.util.regex.Pattern; | |
21 | + | |
22 | +@Data | |
23 | +public class RegexTopicFilter implements MqttTopicFilter { | |
24 | + | |
25 | + private final Pattern regex; | |
26 | + | |
27 | + public RegexTopicFilter(String regex) { | |
28 | + this.regex = Pattern.compile(regex); | |
29 | + } | |
30 | + | |
31 | + @Override | |
32 | + public boolean filter(String topic) { | |
33 | + return regex.matcher(topic).matches(); | |
34 | + } | |
35 | +} | ... | ... |
common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilterFactoryTest.java
renamed from
common/transport/mqtt/src/test/java/org/thingsboard/server/transport/mqtt/util/MqttTopicRegexUtilTest.java
... | ... | @@ -26,7 +26,7 @@ import static org.junit.Assert.assertFalse; |
26 | 26 | import static org.junit.Assert.assertTrue; |
27 | 27 | |
28 | 28 | @RunWith(MockitoJUnitRunner.class) |
29 | -public class MqttTopicRegexUtilTest { | |
29 | +public class MqttTopicFilterFactoryTest { | |
30 | 30 | |
31 | 31 | private static String TEST_STR_1 = "Sensor/Temperature/House/48"; |
32 | 32 | private static String TEST_STR_2 = "Sensor/Temperature"; |
... | ... | @@ -34,23 +34,23 @@ public class MqttTopicRegexUtilTest { |
34 | 34 | |
35 | 35 | @Test |
36 | 36 | public void metadataCanBeUpdated() throws ScriptException { |
37 | - Pattern filter = MqttTopicRegexUtil.toRegex("Sensor/Temperature/House/+"); | |
38 | - assertTrue(filter.matcher(TEST_STR_1).matches()); | |
39 | - assertFalse(filter.matcher(TEST_STR_2).matches()); | |
40 | - | |
41 | - filter = MqttTopicRegexUtil.toRegex("Sensor/+/House/#"); | |
42 | - assertTrue(filter.matcher(TEST_STR_1).matches()); | |
43 | - assertFalse(filter.matcher(TEST_STR_2).matches()); | |
44 | - | |
45 | - filter = MqttTopicRegexUtil.toRegex("Sensor/#"); | |
46 | - assertTrue(filter.matcher(TEST_STR_1).matches()); | |
47 | - assertTrue(filter.matcher(TEST_STR_2).matches()); | |
48 | - assertTrue(filter.matcher(TEST_STR_3).matches()); | |
49 | - | |
50 | - filter = MqttTopicRegexUtil.toRegex("Sensor/Temperature/#"); | |
51 | - assertTrue(filter.matcher(TEST_STR_1).matches()); | |
52 | - assertTrue(filter.matcher(TEST_STR_2).matches()); | |
53 | - assertFalse(filter.matcher(TEST_STR_3).matches()); | |
37 | + MqttTopicFilter filter = MqttTopicFilterFactory.toFilter("Sensor/Temperature/House/+"); | |
38 | + assertTrue(filter.filter(TEST_STR_1)); | |
39 | + assertFalse(filter.filter(TEST_STR_2)); | |
40 | + | |
41 | + filter = MqttTopicFilterFactory.toFilter("Sensor/+/House/#"); | |
42 | + assertTrue(filter.filter(TEST_STR_1)); | |
43 | + assertFalse(filter.filter(TEST_STR_2)); | |
44 | + | |
45 | + filter = MqttTopicFilterFactory.toFilter("Sensor/#"); | |
46 | + assertTrue(filter.filter(TEST_STR_1)); | |
47 | + assertTrue(filter.filter(TEST_STR_2)); | |
48 | + assertTrue(filter.filter(TEST_STR_3)); | |
49 | + | |
50 | + filter = MqttTopicFilterFactory.toFilter("Sensor/Temperature/#"); | |
51 | + assertTrue(filter.filter(TEST_STR_1)); | |
52 | + assertTrue(filter.filter(TEST_STR_2)); | |
53 | + assertFalse(filter.filter(TEST_STR_3)); | |
54 | 54 | } |
55 | 55 | |
56 | 56 | } | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.DeviceProfile; |
22 | 22 | import org.thingsboard.server.common.data.id.DeviceId; |
23 | 23 | import org.thingsboard.server.common.msg.session.SessionContext; |
24 | 24 | import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; |
25 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
25 | 26 | import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; |
26 | 27 | |
27 | 28 | import java.util.UUID; |
... | ... | @@ -41,6 +42,9 @@ public abstract class DeviceAwareSessionContext implements SessionContext { |
41 | 42 | @Getter |
42 | 43 | @Setter |
43 | 44 | protected volatile DeviceProfile deviceProfile; |
45 | + @Getter | |
46 | + @Setter | |
47 | + private volatile TransportProtos.SessionInfoProto sessionInfo; | |
44 | 48 | |
45 | 49 | private volatile boolean connected; |
46 | 50 | |
... | ... | @@ -54,6 +58,13 @@ public abstract class DeviceAwareSessionContext implements SessionContext { |
54 | 58 | this.deviceId = deviceInfo.getDeviceId(); |
55 | 59 | } |
56 | 60 | |
61 | + @Override | |
62 | + public void onProfileUpdate(DeviceProfile deviceProfile) { | |
63 | + this.deviceProfile = deviceProfile; | |
64 | + this.deviceInfo.setDeviceType(deviceProfile.getName()); | |
65 | + this.sessionInfo = TransportProtos.SessionInfoProto.newBuilder().mergeFrom(sessionInfo).setDeviceType(deviceProfile.getName()).build(); | |
66 | + } | |
67 | + | |
57 | 68 | public boolean isConnected() { |
58 | 69 | return connected; |
59 | 70 | } | ... | ... |
... | ... | @@ -19,7 +19,6 @@ |
19 | 19 | <section formGroupName="configuration"> |
20 | 20 | <fieldset class="fields-group"> |
21 | 21 | <legend class="group-title" translate>device-profile.mqtt-device-topic-filters</legend> |
22 | - <h6 class="mat-body" innerHTML="{{ 'device-profile.support-level-wildcards' | translate }}"></h6> | |
23 | 22 | <div fxLayoutGap="8px" fxLayout="column"> |
24 | 23 | <div fxLayout="row" fxLayoutGap="8px" fxLayout.xs="column"> |
25 | 24 | <mat-form-field fxFlex> |
... | ... | @@ -30,24 +29,13 @@ |
30 | 29 | <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceTelemetryTopic').hasError('required')"> |
31 | 30 | {{ 'device-profile.telemetry-topic-filter-required' | translate}} |
32 | 31 | </mat-error> |
33 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceTelemetryTopic').hasError('pattern')"> | |
34 | - {{ 'device-profile.not-valid-pattern-topic-filter' | translate}} | |
32 | + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceTelemetryTopic').hasError('invalidSingleTopicCharacter')"> | |
33 | + {{ 'device-profile.not-valid-single-character' | translate}} | |
35 | 34 | </mat-error> |
36 | - </mat-form-field> | |
37 | - <mat-form-field fxFlex> | |
38 | - <mat-label translate>device-profile.rpc-request-topic-filter</mat-label> | |
39 | - <input matInput required | |
40 | - formControlName="deviceRpcRequestTopic" | |
41 | - type="text"> | |
42 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceRpcRequestTopic').hasError('required')"> | |
43 | - {{ 'device-profile.rpc-request-topic-filter-required' | translate}} | |
44 | - </mat-error> | |
45 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceRpcRequestTopic').hasError('pattern')"> | |
46 | - {{ 'device-profile.not-valid-pattern-topic-filter' | translate}} | |
35 | + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceTelemetryTopic').hasError('invalidMultiTopicCharacter')"> | |
36 | + {{ 'device-profile.not-valid-multi-character' | translate}} | |
47 | 37 | </mat-error> |
48 | 38 | </mat-form-field> |
49 | - </div> | |
50 | - <div fxLayout="row" fxLayoutGap="8px" fxLayout.xs="column"> | |
51 | 39 | <mat-form-field fxFlex> |
52 | 40 | <mat-label translate>device-profile.attributes-topic-filter</mat-label> |
53 | 41 | <input matInput required |
... | ... | @@ -56,23 +44,17 @@ |
56 | 44 | <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceAttributesTopic').hasError('required')"> |
57 | 45 | {{ 'device-profile.attributes-topic-filter-required' | translate}} |
58 | 46 | </mat-error> |
59 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceAttributesTopic').hasError('pattern')"> | |
60 | - {{ 'device-profile.not-valid-pattern-topic-filter' | translate}} | |
61 | - </mat-error> | |
62 | - </mat-form-field> | |
63 | - <mat-form-field fxFlex> | |
64 | - <mat-label translate>device-profile.rpc-response-topic-filter</mat-label> | |
65 | - <input matInput required | |
66 | - formControlName="deviceRpcResponseTopic" | |
67 | - type="text"> | |
68 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceRpcResponseTopic').hasError('required')"> | |
69 | - {{ 'device-profile.rpc-response-topic-filter-required' | translate}} | |
47 | + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceAttributesTopic').hasError('invalidSingleTopicCharacter')"> | |
48 | + {{ 'device-profile.not-valid-single-character' | translate}} | |
70 | 49 | </mat-error> |
71 | - <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceRpcResponseTopic').hasError('pattern')"> | |
72 | - {{ 'device-profile.not-valid-pattern-topic-filter' | translate}} | |
50 | + <mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceAttributesTopic').hasError('invalidMultiTopicCharacter')"> | |
51 | + {{ 'device-profile.not-valid-multi-character' | translate}} | |
73 | 52 | </mat-error> |
74 | 53 | </mat-form-field> |
75 | 54 | </div> |
55 | + <div class="tb-hint" innerHTML="{{ 'device-profile.support-level-wildcards' | translate }}"></div> | |
56 | + <div class="tb-hint" innerHTML="{{ 'device-profile.single-level-wildcards-hint' | translate }}"></div> | |
57 | + <div class="tb-hint" innerHTML="{{ 'device-profile.multi-level-wildcards-hint' | translate }}"></div> | |
76 | 58 | </div> |
77 | 59 | </fieldset> |
78 | 60 | </section> | ... | ... |
... | ... | @@ -15,15 +15,24 @@ |
15 | 15 | /// |
16 | 16 | |
17 | 17 | import { Component, forwardRef, Input, OnInit } from '@angular/core'; |
18 | -import { ControlValueAccessor, FormBuilder, FormGroup, NG_VALUE_ACCESSOR, Validators } from '@angular/forms'; | |
18 | +import { | |
19 | + ControlValueAccessor, | |
20 | + FormBuilder, | |
21 | + FormControl, | |
22 | + FormGroup, | |
23 | + NG_VALUE_ACCESSOR, | |
24 | + ValidatorFn, | |
25 | + Validators | |
26 | +} from '@angular/forms'; | |
19 | 27 | import { Store } from '@ngrx/store'; |
20 | 28 | import { AppState } from '@app/core/core.state'; |
21 | 29 | import { coerceBooleanProperty } from '@angular/cdk/coercion'; |
22 | 30 | import { |
23 | 31 | DeviceProfileTransportConfiguration, |
24 | - DeviceTransportType, MqttDeviceProfileTransportConfiguration | |
32 | + DeviceTransportType, | |
33 | + MqttDeviceProfileTransportConfiguration | |
25 | 34 | } from '@shared/models/device.models'; |
26 | -import { isDefinedAndNotNull } from '../../../../../core/utils'; | |
35 | +import { isDefinedAndNotNull } from '@core/utils'; | |
27 | 36 | |
28 | 37 | @Component({ |
29 | 38 | selector: 'tb-mqtt-device-profile-transport-configuration', |
... | ... | @@ -41,11 +50,10 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control |
41 | 50 | |
42 | 51 | private requiredValue: boolean; |
43 | 52 | |
44 | - private MQTTTopicPattern = new RegExp('^((?![#+]).)*$|^(.*[^#]\/|^)#$|^(.*\/|^)\+(\/.*|$)$'); | |
45 | - | |
46 | 53 | get required(): boolean { |
47 | 54 | return this.requiredValue; |
48 | 55 | } |
56 | + | |
49 | 57 | @Input() |
50 | 58 | set required(value: boolean) { |
51 | 59 | this.requiredValue = coerceBooleanProperty(value); |
... | ... | @@ -70,10 +78,8 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control |
70 | 78 | ngOnInit() { |
71 | 79 | this.mqttDeviceProfileTransportConfigurationFormGroup = this.fb.group({ |
72 | 80 | configuration: this.fb.group({ |
73 | - deviceAttributesTopic: [null, [Validators.required, Validators.pattern(this.MQTTTopicPattern)]], | |
74 | - deviceTelemetryTopic: [null, [Validators.required, Validators.pattern(this.MQTTTopicPattern)]], | |
75 | - deviceRpcRequestTopic: [null, [Validators.required, Validators.pattern(this.MQTTTopicPattern)]], | |
76 | - deviceRpcResponseTopic: [null, [Validators.required, Validators.pattern(this.MQTTTopicPattern)]] | |
81 | + deviceAttributesTopic: [null, [Validators.required, this.validationMQTTTopic()]], | |
82 | + deviceTelemetryTopic: [null, [Validators.required, this.validationMQTTTopic()]] | |
77 | 83 | }) |
78 | 84 | }); |
79 | 85 | this.mqttDeviceProfileTransportConfigurationFormGroup.valueChanges.subscribe(() => { |
... | ... | @@ -104,4 +110,34 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control |
104 | 110 | } |
105 | 111 | this.propagateChange(configuration); |
106 | 112 | } |
113 | + | |
114 | + private validationMQTTTopic(): ValidatorFn { | |
115 | + return (c: FormControl) => { | |
116 | + const newTopic = c.value; | |
117 | + const wildcardSymbols = /[#+]/g; | |
118 | + let findSymbol = wildcardSymbols.exec(newTopic); | |
119 | + while (findSymbol) { | |
120 | + const index = findSymbol.index; | |
121 | + const currentSymbol = findSymbol[0]; | |
122 | + const prevSymbol = index > 0 ? newTopic[index - 1] : null; | |
123 | + const nextSymbol = index < (newTopic.length - 1) ? newTopic[index + 1] : null; | |
124 | + if (currentSymbol === '#' && (index !== (newTopic.length - 1) || (prevSymbol !== null && prevSymbol !== '/'))) { | |
125 | + return { | |
126 | + invalidMultiTopicCharacter: { | |
127 | + valid: false | |
128 | + } | |
129 | + }; | |
130 | + } | |
131 | + if (currentSymbol === '+' && ((prevSymbol !== null && prevSymbol !== '/') || (nextSymbol !== null && nextSymbol !== '/'))) { | |
132 | + return { | |
133 | + invalidSingleTopicCharacter: { | |
134 | + valid: false | |
135 | + } | |
136 | + }; | |
137 | + } | |
138 | + findSymbol = wildcardSymbols.exec(newTopic); | |
139 | + } | |
140 | + return null; | |
141 | + }; | |
142 | + } | |
107 | 143 | } | ... | ... |
... | ... | @@ -110,8 +110,6 @@ export interface DefaultDeviceProfileTransportConfiguration { |
110 | 110 | export interface MqttDeviceProfileTransportConfiguration { |
111 | 111 | deviceTelemetryTopic?: string; |
112 | 112 | deviceAttributesTopic?: string; |
113 | - deviceRpcRequestTopic?: string; | |
114 | - deviceRpcResponseTopic?: string; | |
115 | 113 | [key: string]: any; |
116 | 114 | } |
117 | 115 | |
... | ... | @@ -164,9 +162,7 @@ export function createDeviceProfileTransportConfiguration(type: DeviceTransportT |
164 | 162 | case DeviceTransportType.MQTT: |
165 | 163 | const mqttTransportConfiguration: MqttDeviceProfileTransportConfiguration = { |
166 | 164 | deviceTelemetryTopic: 'v1/devices/me/telemetry', |
167 | - deviceAttributesTopic: 'v1/devices/me/attributes', | |
168 | - deviceRpcRequestTopic: 'v1/devices/me/rpc/request/', | |
169 | - deviceRpcResponseTopic: 'v1/devices/me/rpc/response/' | |
165 | + deviceAttributesTopic: 'v1/devices/me/attributes' | |
170 | 166 | }; |
171 | 167 | transportConfiguration = {...mqttTransportConfiguration, type: DeviceTransportType.MQTT}; |
172 | 168 | break; | ... | ... |
... | ... | @@ -792,16 +792,18 @@ |
792 | 792 | "no-device-profiles-found": "No device profiles found.", |
793 | 793 | "create-new-device-profile": "Create a new one!", |
794 | 794 | "mqtt-device-topic-filters": "MQTT device topic filters", |
795 | - "support-level-wildcards": "Support single <code>(+)</code> and multi <code>(#)</code> level wildcards", | |
795 | + "support-level-wildcards": "Single <code>[+]</code> and multi-level <code>[#]</code> wildcards supported.", | |
796 | 796 | "telemetry-topic-filter": "Telemetry topic filter", |
797 | 797 | "telemetry-topic-filter-required": "Telemetry topic filter is required.", |
798 | - "rpc-request-topic-filter": "RPC request topic filter", | |
799 | - "rpc-request-topic-filter-required": "RPC request topic filter is required.", | |
800 | 798 | "attributes-topic-filter": "Attributes topic filter", |
801 | 799 | "attributes-topic-filter-required": "Attributes topic filter is required.", |
802 | 800 | "rpc-response-topic-filter": "RPC response topic filter", |
803 | 801 | "rpc-response-topic-filter-required": "RPC response topic filter is required.", |
804 | 802 | "not-valid-pattern-topic-filter": "Not valid pattern topic filter", |
803 | + "not-valid-single-character": "Invalid use of a single-level wildcard character", | |
804 | + "not-valid-multi-character": "Invalid use of a multi-level wildcard character", | |
805 | + "single-level-wildcards-hint": "<code>[+]</code> is suitable for any topic filter level. Ex.: <b>v1/devices/+/telemetry</b> or <b>+/devices/+/attributes</b>.", | |
806 | + "multi-level-wildcards-hint": "<code>[#]</code> can replace the topic filter itself and must be the last symbol of the topic. Ex.: <b>#</b> or <b>v1/devices/me/#</b>.", | |
805 | 807 | "alarm-rules": "Alarm rules ({{count}})", |
806 | 808 | "add-alarm-rule": "Add alarm rule", |
807 | 809 | "edit-alarm-rule": "Edit alarm rule", | ... | ... |