Commit 0da8eb4ebbfb760c984395541b161a09eb49651d

Authored by 云中非
1 parent eb21d2d6

feat: TCP协议支持网关模式

... ... @@ -42,6 +42,7 @@ public class TcpTransportContext extends TransportContext {
42 42 @Autowired(required = false)
43 43 private TcpSslHandlerProvider sslHandlerProvider;
44 44
  45 + /**注入多种数据协议处理器,例如:modbus等*/
45 46 @Getter
46 47 @Autowired
47 48 private JsonTcpAdaptor jsonTcpAdaptor;
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.transport.tcp;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
18 19 import io.netty.buffer.ByteBuf;
19 20 import io.netty.buffer.Unpooled;
20 21 import io.netty.channel.ChannelFuture;
... ... @@ -27,8 +28,10 @@ import io.netty.util.concurrent.Future;
27 28 import io.netty.util.concurrent.GenericFutureListener;
28 29 import lombok.extern.slf4j.Slf4j;
29 30 import org.apache.commons.lang3.StringUtils;
30   -import org.thingsboard.server.common.data.*;
31   -import org.thingsboard.server.common.data.device.profile.MqttTopics;
  31 +import org.thingsboard.server.common.data.DataConstants;
  32 +import org.thingsboard.server.common.data.Device;
  33 +import org.thingsboard.server.common.data.DeviceProfile;
  34 +import org.thingsboard.server.common.data.DeviceTransportType;
32 35 import org.thingsboard.server.common.data.id.DeviceId;
33 36 import org.thingsboard.server.common.data.id.OtaPackageId;
34 37 import org.thingsboard.server.common.data.ota.OtaPackageType;
... ... @@ -41,17 +44,21 @@ import org.thingsboard.server.common.transport.TransportService;
41 44 import org.thingsboard.server.common.transport.TransportServiceCallback;
42 45 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
43 46 import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
  47 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
44 48 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
  49 +import org.thingsboard.server.common.transport.service.DefaultTransportService;
  50 +import org.thingsboard.server.common.transport.service.SessionMetaData;
45 51 import org.thingsboard.server.common.transport.util.SslUtil;
46 52 import org.thingsboard.server.gen.transport.TransportProtos;
47 53 import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
48   -import org.thingsboard.server.queue.scheduler.SchedulerComponent;
49 54 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
50   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
  55 +import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx;
51 56 import org.thingsboard.server.transport.tcp.session.TCPMessage;
  57 +import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler;
52 58 import org.thingsboard.server.transport.tcp.util.ByteUtils;
53 59
54 60 import javax.net.ssl.SSLPeerUnverifiedException;
  61 +import java.io.IOException;
55 62 import java.io.UnsupportedEncodingException;
56 63 import java.net.InetSocketAddress;
57 64 import java.security.cert.Certificate;
... ... @@ -62,13 +69,13 @@ import java.util.UUID;
62 69 import java.util.concurrent.ConcurrentHashMap;
63 70 import java.util.concurrent.ConcurrentMap;
64 71 import java.util.regex.Matcher;
65   -import java.util.regex.Pattern;
66 72
67 73 import static com.amazonaws.util.StringUtils.UTF8;
68 74 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
69 75 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
70   -import static io.netty.handler.codec.mqtt.MqttMessageType.*;
71   -import static io.netty.handler.codec.mqtt.MqttQoS.*;
  76 +import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
  77 +import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
  78 +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
72 79 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
73 80 import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
74 81
... ... @@ -82,14 +89,14 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
82 89 private final UUID sessionId;
83 90 private final TcpTransportContext context;
84 91 private final TransportService transportService;
85   - private final SchedulerComponent scheduler;
86 92 private final SslHandler sslHandler;
87 93
88 94
89 95 /**需要处理的消息队列,例如:需要下发给设备的,设备上传的。*/
90   - final DeviceSessionCtx deviceSessionCtx;
  96 + final TcpDeviceSessionCtx deviceSessionCtx;
91 97 volatile InetSocketAddress address;
92 98
  99 + volatile TcpGatewaySessionHandler gatewaySessionHandler;
93 100 private final ConcurrentHashMap<String, String> otaPackSessions;
94 101 private final ConcurrentHashMap<String, Integer> chunkSizes;
95 102 private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
... ... @@ -100,9 +107,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
100 107 this.sessionId = UUID.randomUUID();
101 108 this.context = context;
102 109 this.transportService = context.getTransportService();
103   - this.scheduler = context.getScheduler();
104 110 this.sslHandler = sslHandler;
105   - this.deviceSessionCtx = new DeviceSessionCtx(sessionId, context);
  111 + this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
106 112 this.otaPackSessions = new ConcurrentHashMap<>();
107 113 this.chunkSizes = new ConcurrentHashMap<>();
108 114 this.rpcAwaitingAck = new ConcurrentHashMap<>();
... ... @@ -197,7 +203,38 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
197 203 if (!checkConnected(ctx, tcpMessage)) {
198 204 return;
199 205 }
  206 + log.error("【{}】设备【{}】收到数据【{}】", sessionId,deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
  207 + if (gatewaySessionHandler != null) {
  208 + processGatewayDeviceMsg(ctx, tcpMessage);
  209 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  210 + } else {
  211 + processDirectDeviceMsg(ctx,tcpMessage);
  212 + }
  213 + }
200 214
  215 +
  216 + private void processGatewayDeviceMsg(ChannelHandlerContext ctx, TCPMessage tcpMessage) {
  217 + log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
  218 + try {
  219 + String topicName = tcpMessage.getTopic();
  220 + if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
  221 + gatewaySessionHandler.onDeviceAttributes(tcpMessage);
  222 + } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
  223 + gatewaySessionHandler.onDeviceTelemetry(tcpMessage);
  224 +
  225 + } else if (deviceSessionCtx.isToDeviceRpcResponseTopic(topicName)) {
  226 +
  227 + } else {
  228 + transportService.reportActivity(deviceSessionCtx.getSessionInfo());
  229 + pushDeviceMsg(ctx,tcpMessage);
  230 + }
  231 + } catch (AdaptorException e) {
  232 + log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
  233 + ctx.close();
  234 + }
  235 + }
  236 +
  237 + private void processDirectDeviceMsg(ChannelHandlerContext ctx, TCPMessage tcpMessage) {
201 238 log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage.getMessage());
202 239 try {
203 240 String topicName = tcpMessage.getTopic();
... ... @@ -279,7 +316,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
279 316 // attrReqTopicType = TopicType.V2;
280 317 } else {
281 318 transportService.reportActivity(deviceSessionCtx.getSessionInfo());
282   - pushDeviceMsg(tcpMessage.getMessage());
  319 + pushDeviceMsg(ctx,tcpMessage);
283 320 }
284 321 } catch (AdaptorException e) {
285 322 log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, tcpMessage, e);
... ... @@ -322,18 +359,16 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
322 359 }
323 360 }
324 361
325   - private void ack(ChannelHandlerContext ctx, int msgId) {
326   - if (msgId > 0) {
327   - ctx.writeAndFlush(createMqttPubAckMsg(msgId));
328   - }
329   - }
330 362
331   - private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
  363 +
  364 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String msgId, final TCPMessage msg) {
332 365 return new TransportServiceCallback<>() {
333 366 @Override
334 367 public void onSuccess(Void dummy) {
335 368 log.trace("[{}] Published msg: {}", sessionId, msg);
336   - ack(ctx, msgId);
  369 + if(StringUtils.isNotEmpty(msgId)){
  370 + pushDeviceMsg(ctx,msg);
  371 + }
337 372 }
338 373
339 374 @Override
... ... @@ -382,7 +417,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
382 417
383 418 private void sendOtaPackage(ChannelHandlerContext ctx, int msgId, String firmwareId, String requestId, int chunkSize, int chunk, OtaPackageType type) {
384 419 log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId);
385   - ack(ctx, msgId);
  420 + pushDeviceMsg(ctx,new TCPMessage(requestId));
386 421 try {
387 422 byte[] firmwareChunk = context.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk);
388 423 deviceSessionCtx.getPayloadAdaptor()
... ... @@ -521,7 +556,23 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
521 556 }
522 557 }
523 558
524   -
  559 + private void checkGatewaySession(SessionMetaData sessionMetaData) {
  560 + TransportDeviceInfo device = deviceSessionCtx.getDeviceInfo();
  561 + try {
  562 + JsonNode infoNode = context.getMapper().readTree(device.getAdditionalInfo());
  563 + if (infoNode != null) {
  564 + JsonNode gatewayNode = infoNode.get("gateway");
  565 + if (gatewayNode != null && gatewayNode.asBoolean()) {
  566 + gatewaySessionHandler = new TcpGatewaySessionHandler(deviceSessionCtx, sessionId);
  567 + if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
  568 + sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
  569 + }
  570 + }
  571 + }
  572 + } catch (IOException e) {
  573 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, device.getDeviceName(), e);
  574 + }
  575 + }
525 576
526 577 @Override
527 578 public void operationComplete(Future<? super Void> future) throws Exception {
... ... @@ -534,6 +585,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
534 585 log.debug("[{}] Client disconnected!", sessionId);
535 586 transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
536 587 transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  588 + if (gatewaySessionHandler != null) {
  589 + gatewaySessionHandler.onGatewayDisconnect();
  590 + }
537 591 deviceSessionCtx.setDisconnected();
538 592 }
539 593 deviceSessionCtx.release();
... ... @@ -553,7 +607,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
553 607 transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback<Void>() {
554 608 @Override
555 609 public void onSuccess(Void msg) {
556   - transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), TcpTransportHandler.this);
  610 + SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), TcpTransportHandler.this);
  611 + checkGatewaySession(sessionMetaData);
557 612 ctx.writeAndFlush(createTcpConnAckMsg(CONNECTION_ACCEPTED));
558 613 deviceSessionCtx.setConnected(true);
559 614 log.debug("[{}] Client connected!", sessionId);
... ... @@ -608,7 +663,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
608 663
609 664 @Override
610 665 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
611   - log.error("【{}】下发RPC命令【】给设备", sessionId,rpcRequest.getParams());
  666 + log.error("【{}】下发RPC命令【{}】给设备", sessionId,rpcRequest.getParams());
612 667 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
613 668 try {
614 669 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
... ... @@ -622,7 +677,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
622 677 // }
623 678 // }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
624 679 // }
625   - var cf = pushDeviceMsg(payload.getMessage());
  680 + var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload);
626 681 cf.addListener(result -> {
627 682 if (result.cause() == null) {
628 683 // if (!isAckExpected(payload)) {
... ... @@ -658,11 +713,12 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
658 713
659 714 /**
660 715 * 往设备推送消息
661   - * @param message
  716 + * @param tcp
662 717 * @return
663 718 */
664   - private ChannelFuture pushDeviceMsg(String message) {
  719 + private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx,TCPMessage tcp) {
665 720 try {
  721 + String message = tcp.getMessage();
666 722 byte[] payloadInBytes ;
667 723 if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
668 724 payloadInBytes = ByteUtils.hexStr2Bytes(message);
... ... @@ -674,7 +730,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
674 730 // payload.writeBytes(payloadInBytes);
675 731 ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);
676 732
677   - return deviceSessionCtx.getChannel().writeAndFlush(payload);
  733 + return ctx.writeAndFlush(payload);
678 734 } catch (UnsupportedEncodingException e) {
679 735 log.error(e.getMessage(),e);
680 736 throw new RuntimeException(e);
... ...
1 1 /**
2 2 * Copyright © 2016-2022 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -34,8 +34,8 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
34 34 import org.thingsboard.server.common.yunteng.script.YtScriptInvokeService;
35 35 import org.thingsboard.server.common.yunteng.script.YtScriptType;
36 36 import org.thingsboard.server.gen.transport.TransportProtos;
37   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
38 37 import org.thingsboard.server.transport.tcp.session.TCPMessage;
  38 +import org.thingsboard.server.transport.tcp.session.TcpDeviceWareSessionContext;
39 39
40 40 import java.io.UnsupportedEncodingException;
41 41 import java.nio.charset.Charset;
... ... @@ -55,8 +55,9 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
55 55 @Autowired
56 56 private YtScriptInvokeService jsEngine;
57 57 private static final JsonParser parser = new JsonParser();
  58 +
58 59 @Override
59   - public TransportProtos.PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, String inbound) throws AdaptorException {
  60 + public TransportProtos.PostTelemetryMsg convertToPostTelemetry(TcpDeviceWareSessionContext ctx, String inbound) throws AdaptorException {
60 61 try {
61 62 JsonElement payload = validatePayload(ctx, inbound, false);
62 63 return JsonConverter.convertToTelemetryProto(payload);
... ... @@ -76,7 +77,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
76 77 }
77 78
78 79 @Override
79   - public TransportProtos.PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  80 + public TransportProtos.PostAttributeMsg convertToPostAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
80 81 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
81 82 try {
82 83 return JsonConverter.convertToAttributesProto(new JsonParser().parse(payload));
... ... @@ -87,7 +88,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
87 88 }
88 89
89 90 @Override
90   - public TransportProtos.ClaimDeviceMsg convertToClaimDevice(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  91 + public TransportProtos.ClaimDeviceMsg convertToClaimDevice(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
91 92 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), true);
92 93 try {
93 94 return JsonConverter.convertToClaimDeviceProto(ctx.getDeviceId(), payload);
... ... @@ -98,7 +99,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
98 99 }
99 100
100 101 @Override
101   - public TransportProtos.ProvisionDeviceRequestMsg convertToProvisionRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException {
  102 + public TransportProtos.ProvisionDeviceRequestMsg convertToProvisionRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
102 103 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
103 104 try {
104 105 return JsonConverter.convertToProvisionRequestMsg(payload);
... ... @@ -108,72 +109,72 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
108 109 }
109 110
110 111 @Override
111   - public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  112 + public TransportProtos.GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
112 113 return processGetAttributeRequestMsg(inbound, topicBase);
113 114 }
114 115
115 116 @Override
116   - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, String inbound, String topicBase) throws AdaptorException {
  117 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String inbound, String topicBase) throws AdaptorException {
117 118 return processToDeviceRpcResponseMsg(inbound, topicBase);
118 119 }
119 120
120 121 @Override
121   - public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  122 + public TransportProtos.ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
122 123 return processToServerRpcRequestMsg(ctx, inbound, topicBase);
123 124 }
124 125
125 126 @Override
126   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
  127 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
127 128 return processConvertFromAttributeResponseMsg(ctx, responseMsg, topicBase);
128 129 }
129 130
130 131 @Override
131   - public Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  132 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
132 133 return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg);
133 134 }
134 135
135 136 @Override
136   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
  137 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) {
137 138 return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(notificationMsg)));
138 139 }
139 140
140 141 @Override
141   - public Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
  142 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) {
142 143 JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, notificationMsg);
143 144 return Optional.of(createTcpMessage(ctx, result));
144 145 }
145 146
146 147 @Override
147   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws UnsupportedEncodingException {
148   - byte[] result = null;
  148 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws UnsupportedEncodingException {
  149 + byte[] result = null;
149 150 String payload = rpcRequest.getParams();//methodThingskit
150 151 // if(ctx.getPayloadType().equals(TcpDataTypeEnum.ASCII)){
151 152 // }else{
152 153 // result= ByteUtils.hexToBytes(payload);
153 154 // }
154   - if(!payload.startsWith("{") && !payload.endsWith("}")){
155   - payload = payload.substring(1,payload.length()-1);
  155 + if (!payload.startsWith("{") && !payload.endsWith("}")) {
  156 + payload = payload.replace("\"","");;
156 157 }
157   - return Optional.of(createTcpMessage(ctx,payload));
  158 + return Optional.of(createTcpMessage(ctx, payload));
158 159 }
159 160
160 161 @Override
161   - public Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  162 + public Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
162 163 return Optional.of(createTcpMessage(ctx, JsonConverter.toGatewayJson(deviceName, rpcRequest)));
163 164 }
164 165
165 166 @Override
166   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
  167 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) {
167 168 return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(rpcResponse)));
168 169 }
169 170
170 171 @Override
171   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, TransportProtos.ProvisionDeviceResponseMsg provisionResponse) {
172   - return Optional.of(createTcpMessage(ctx,JsonConverter.toJson(provisionResponse)));
  172 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, TransportProtos.ProvisionDeviceResponseMsg provisionResponse) {
  173 + return Optional.of(createTcpMessage(ctx, JsonConverter.toJson(provisionResponse)));
173 174 }
174 175
175 176 @Override
176   - public Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) {
  177 + public Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) {
177 178 return Optional.of(null);
178 179 }
179 180
... ... @@ -222,7 +223,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
222 223 return null;
223 224 }
224 225
225   - private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
  226 + private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
226 227 String topicName = inbound.variableHeader().topicName();
227 228 String payload = validatePayload(ctx.getSessionId(), inbound.payload(), false);
228 229 try {
... ... @@ -234,7 +235,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
234 235 }
235 236 }
236 237
237   - private Optional<TCPMessage> processConvertFromAttributeResponseMsg(DeviceSessionCtx ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
  238 + private Optional<TCPMessage> processConvertFromAttributeResponseMsg(TcpDeviceWareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
238 239 if (!StringUtils.isEmpty(responseMsg.getError())) {
239 240 throw new AdaptorException(responseMsg.getError());
240 241 } else {
... ... @@ -247,17 +248,16 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
247 248 }
248 249 }
249 250
250   - private Optional<TCPMessage> processConvertFromGatewayAttributeResponseMsg(DeviceSessionCtx ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
  251 + private Optional<TCPMessage> processConvertFromGatewayAttributeResponseMsg(TcpDeviceWareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
251 252 if (!StringUtils.isEmpty(responseMsg.getError())) {
252 253 throw new AdaptorException(responseMsg.getError());
253 254 } else {
254 255 JsonObject result = JsonConverter.getJsonObjectForGateway(deviceName, responseMsg);
255   - return Optional.of(createTcpMessage(ctx, result));
  256 + return Optional.of(createTcpMessage(ctx, result));
256 257 }
257 258 }
258 259
259 260
260   -
261 261 private Set<String> toStringSet(JsonElement requestBody, String name) {
262 262 JsonElement element = requestBody.getAsJsonObject().get(name);
263 263 if (element != null) {
... ... @@ -278,7 +278,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
278 278 return payload;
279 279 }
280 280
281   - private JsonElement validatePayload(DeviceSessionCtx session, String payload, boolean isEmptyPayloadAllowed) throws AdaptorException, ExecutionException, InterruptedException {
  281 + private JsonElement validatePayload(TcpDeviceWareSessionContext session, String payload, boolean isEmptyPayloadAllowed) throws AdaptorException, ExecutionException, InterruptedException {
282 282 if (payload == null) {
283 283 log.debug("[{}] Payload is empty!", session.getSessionId());
284 284 if (!isEmptyPayloadAllowed) {
... ... @@ -305,7 +305,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
305 305 }
306 306
307 307
308   - protected TCPMessage createTcpMessage(DeviceSessionCtx ctx, JsonElement json) {
  308 + protected TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, JsonElement json) {
309 309 // TCPMessage msg = new TCPMessage(MqttMessageType.PUBLISH,);
310 310 //// new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0);
311 311 // MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId());
... ... @@ -315,11 +315,11 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
315 315 return null;
316 316 }
317 317
318   - protected TCPMessage createTcpMessage(DeviceSessionCtx ctx, String payload) {
  318 + protected TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, String payload) {
319 319 TCPMessage message = new TCPMessage(payload);
320   - message.setRequestId(payload.substring(0,4));
321   - message.setTopic(payload.substring(2,4));
322   - message.setDeviceCode(payload.substring(0,2));
  320 + message.setRequestId(payload.substring(0, 4));
  321 + message.setTopic(payload.substring(2, 4));
  322 + message.setDeviceCode(payload.substring(0, 2));
323 323 return message;
324 324 }
325 325
... ...
... ... @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.ota.OtaPackageType;
21 21 import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
22 22 import org.thingsboard.server.common.transport.adaptor.AdaptorException;
23 23 import org.thingsboard.server.gen.transport.TransportProtos.*;
24   -import org.thingsboard.server.transport.tcp.session.DeviceSessionCtx;
  24 +import org.thingsboard.server.transport.tcp.session.TcpDeviceWareSessionContext;
25 25 import org.thingsboard.server.transport.tcp.session.TCPMessage;
26 26
27 27 import java.io.UnsupportedEncodingException;
... ... @@ -32,44 +32,46 @@ import java.util.UUID;
32 32 import java.util.concurrent.ExecutionException;
33 33
34 34 /**
35   - * @author Andrew Shvayka
  35 + * 将收到的数据流转换为接口需要的数据格式
  36 + * 1、基于解析脚本将ByteBuf转JSON对象。
  37 + * 2、将JSON对象转PROTOBUF对象。
36 38 */
37 39 public interface TcpTransportAdaptor {
38 40 static char[] HEX_VOCABLE = {'0', '1', '2', '3', '4', '5', '6', '7',
39 41 '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
40 42 static final Charset UTF8 = StandardCharsets.UTF_8;
41   - PostTelemetryMsg convertToPostTelemetry(DeviceSessionCtx ctx, String inbound) throws AdaptorException;
  43 + PostTelemetryMsg convertToPostTelemetry(TcpDeviceWareSessionContext ctx, String inbound) throws AdaptorException;
42 44
43 45 UUID getJsScriptEngineFunctionId(String scriptBody, String... argNames) throws ExecutionException, InterruptedException;
44   - PostAttributeMsg convertToPostAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  46 + PostAttributeMsg convertToPostAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
45 47
46   - GetAttributeRequestMsg convertToGetAttributes(DeviceSessionCtx ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
  48 + GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
47 49
48   - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(DeviceSessionCtx ctx, String mqttMsg, String topicBase) throws AdaptorException;
  50 + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String mqttMsg, String topicBase) throws AdaptorException;
49 51
50   - ToServerRpcRequestMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
  52 + ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
51 53
52   - ClaimDeviceMsg convertToClaimDevice(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  54 + ClaimDeviceMsg convertToClaimDevice(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
53 55
54   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
  56 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
55 57
56   - Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
  58 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
57 59
58   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
  60 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
59 61
60   - Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
  62 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
61 63
62   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException, UnsupportedEncodingException;
  64 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException, UnsupportedEncodingException;
63 65
64   - Optional<TCPMessage> convertToGatewayPublish(DeviceSessionCtx ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
  66 + Optional<TCPMessage> convertToGatewayPublish(TcpDeviceWareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
65 67
66   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
  68 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
67 69
68   - ProvisionDeviceRequestMsg convertToProvisionRequestMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException;
  70 + ProvisionDeviceRequestMsg convertToProvisionRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
69 71
70   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
  72 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
71 73
72   - Optional<TCPMessage> convertToPublish(DeviceSessionCtx ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;
  74 + Optional<TCPMessage> convertToPublish(TcpDeviceWareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;
73 75 public static byte[] toBytes(ByteBuf inbound) {
74 76 byte[] bytes = new byte[inbound.readableBytes()];
75 77 int readerIndex = inbound.readerIndex();
... ... @@ -86,7 +88,7 @@ public interface TcpTransportAdaptor {
86 88 }
87 89 return sb.toString();
88 90 }
89   - default TCPMessage createTcpMessage(DeviceSessionCtx ctx, ByteBuf payload) {
  91 + default TCPMessage createTcpMessage(TcpDeviceWareSessionContext ctx, ByteBuf payload) {
90 92 String payloadStr;
91 93 if(ctx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
92 94 byte[] payloadBytes = toBytes(payload);
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +import io.netty.channel.ChannelHandlerContext;
  19 +import io.netty.util.ReferenceCountUtil;
  20 +import lombok.Getter;
  21 +import lombok.Setter;
  22 +import lombok.extern.slf4j.Slf4j;
  23 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  24 +
  25 +import java.util.UUID;
  26 +import java.util.concurrent.ConcurrentLinkedQueue;
  27 +import java.util.concurrent.atomic.AtomicInteger;
  28 +import java.util.concurrent.locks.Lock;
  29 +import java.util.concurrent.locks.ReentrantLock;
  30 +import java.util.function.Consumer;
  31 +
  32 +/**
  33 + * @author Andrew Shvayka
  34 + */
  35 +@Slf4j
  36 +public class TcpDeviceSessionCtx extends TcpDeviceWareSessionContext {
  37 +
  38 + @Getter
  39 + @Setter
  40 + private ChannelHandlerContext channel;
  41 +
  42 +
  43 + private final AtomicInteger msgIdSeq = new AtomicInteger(0);
  44 +
  45 + private final ConcurrentLinkedQueue<TCPMessage> msgQueue = new ConcurrentLinkedQueue<>();
  46 +
  47 + @Getter
  48 + private final Lock msgQueueProcessorLock = new ReentrantLock();
  49 +
  50 + private final AtomicInteger msgQueueSize = new AtomicInteger(0);
  51 +
  52 + @Getter
  53 + @Setter
  54 + private boolean provisionOnly = false;
  55 +
  56 + public TcpDeviceSessionCtx(UUID sessionId, TcpTransportContext context) {
  57 + super(sessionId,context);
  58 + }
  59 +
  60 + public int nextMsgId() {
  61 + return msgIdSeq.incrementAndGet();
  62 + }
  63 +
  64 +
  65 +
  66 +
  67 +
  68 +
  69 +
  70 +
  71 +
  72 +
  73 +
  74 +
  75 +
  76 + public void addToQueue(TCPMessage msg) {
  77 + msgQueueSize.incrementAndGet();
  78 + ReferenceCountUtil.retain(msg);
  79 + msgQueue.add(msg);
  80 + }
  81 +
  82 + public void tryProcessQueuedMsgs(Consumer<TCPMessage> msgProcessor) {
  83 + while (!msgQueue.isEmpty()) {
  84 + if (msgQueueProcessorLock.tryLock()) {
  85 + try {
  86 + TCPMessage msg;
  87 + while ((msg = msgQueue.poll()) != null) {
  88 + try {
  89 + msgQueueSize.decrementAndGet();
  90 + msgProcessor.accept(msg);
  91 + } finally {
  92 + ReferenceCountUtil.safeRelease(msg);
  93 + }
  94 + }
  95 + } finally {
  96 + msgQueueProcessorLock.unlock();
  97 + }
  98 + } else {
  99 + return;
  100 + }
  101 + }
  102 + }
  103 +
  104 + public int getMsgQueueSize() {
  105 + return msgQueueSize.get();
  106 + }
  107 +
  108 + public void release() {
  109 + if (!msgQueue.isEmpty()) {
  110 + log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size());
  111 + msgQueue.forEach(ReferenceCountUtil::safeRelease);
  112 + msgQueue.clear();
  113 + }
  114 + }
  115 +
  116 +
  117 +}
... ...
common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/session/TcpDeviceWareSessionContext.java renamed from common/transport/tcp/src/main/java/org/thingsboard/server/transport/tcp/session/DeviceSessionCtx.java
... ... @@ -15,75 +15,56 @@
15 15 */
16 16 package org.thingsboard.server.transport.tcp.session;
17 17
18   -import io.netty.channel.ChannelHandlerContext;
19   -import io.netty.util.ReferenceCountUtil;
  18 +import com.fasterxml.jackson.core.JsonProcessingException;
  19 +import com.fasterxml.jackson.databind.JsonNode;
20 20 import lombok.Getter;
21   -import lombok.Setter;
22 21 import lombok.extern.slf4j.Slf4j;
23 22 import org.thingsboard.server.common.data.DeviceProfile;
24 23 import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
25 24 import org.thingsboard.server.common.data.device.profile.YtTcpDeviceProfileTransportConfiguration;
  25 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
26 26 import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
  27 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
27 28 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
28 29 import org.thingsboard.server.gen.transport.TransportProtos;
29 30 import org.thingsboard.server.transport.tcp.TcpTransportContext;
30 31 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
31   -import org.thingsboard.server.transport.tcp.util.ByteUtils;
32 32
33 33 import java.util.UUID;
34   -import java.util.concurrent.ConcurrentLinkedQueue;
35 34 import java.util.concurrent.ExecutionException;
36   -import java.util.concurrent.atomic.AtomicInteger;
37   -import java.util.concurrent.locks.Lock;
38   -import java.util.concurrent.locks.ReentrantLock;
39   -import java.util.function.Consumer;
40 35
41 36 /**
42 37 * @author Andrew Shvayka
43 38 */
44 39 @Slf4j
45   -public class DeviceSessionCtx extends DeviceAwareSessionContext {
46   -
47   - @Getter
48   - @Setter
49   - private ChannelHandlerContext channel;
  40 +public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionContext {
50 41
51 42 @Getter
52 43 private final TcpTransportContext context;
53 44
54   - private final AtomicInteger msgIdSeq = new AtomicInteger(0);
55   -
56   - private final ConcurrentLinkedQueue<TCPMessage> msgQueue = new ConcurrentLinkedQueue<>();
57   -
58   - @Getter
59   - private final Lock msgQueueProcessorLock = new ReentrantLock();
60   -
61   - private final AtomicInteger msgQueueSize = new AtomicInteger(0);
62   -
63   - @Getter
64   - @Setter
65   - private boolean provisionOnly = false;
66   -
67 45 private volatile String telemetryTopicFilter ;
68 46 private volatile String attributesTopicFilter;
69 47 private volatile String toDeviceRpcResponseTopicFilter;
70 48 @Getter
71 49 private volatile TcpDataTypeEnum payloadType = TcpDataTypeEnum.HEX;
72 50
73   -
74 51 private volatile TcpTransportAdaptor adaptor;
  52 +
  53 + /**设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符*/
  54 + @Getter
  55 + private volatile String deviceCode = "55";
  56 +
  57 +
75 58 @Getter
76 59 private UUID scriptId;
77 60
78   - public DeviceSessionCtx(UUID sessionId, TcpTransportContext context) {
  61 + public TcpDeviceWareSessionContext(UUID sessionId, TcpTransportContext context) {
79 62 super(sessionId);
80 63 this.context = context;
81 64 this.adaptor = context.getJsonTcpAdaptor();
82 65 }
83 66
84   - public int nextMsgId() {
85   - return msgIdSeq.incrementAndGet();
86   - }
  67 +
87 68 public boolean isDeviceTelemetryTopic(String topicName) {
88 69 return telemetryTopicFilter.equals(topicName);
89 70 }
... ... @@ -96,11 +77,23 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
96 77 }
97 78
98 79 public TcpTransportAdaptor getPayloadAdaptor() {
99   - return adaptor;
  80 + return this.adaptor;
100 81 }
101 82
102 83
  84 + @Override
  85 + public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
  86 + super.setDeviceInfo(deviceInfo);
  87 + try {
  88 + JsonNode additionalInfo = context.getMapper().readTree(deviceInfo.getAdditionalInfo());
  89 + if(additionalInfo !=null && additionalInfo.has(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED)){
  90 + deviceCode = additionalInfo.get(FastIotConstants.TCP_DEVICE_IDENTIFY_FILED).asText();
  91 + }
  92 + } catch (JsonProcessingException e) {
  93 + log.trace("[{}][{}] Failed to fetch device additional info", sessionId, deviceInfo.getDeviceName(), e);
  94 + }
103 95
  96 + }
104 97
105 98 @Override
106 99 public void setDeviceProfile(DeviceProfile deviceProfile) {
... ... @@ -138,50 +131,4 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext {
138 131 }
139 132 }
140 133
141   -
142   -
143   -
144   -
145   -
146   - public void addToQueue(TCPMessage msg) {
147   - msgQueueSize.incrementAndGet();
148   - ReferenceCountUtil.retain(msg);
149   - msgQueue.add(msg);
150   - }
151   -
152   - public void tryProcessQueuedMsgs(Consumer<TCPMessage> msgProcessor) {
153   - while (!msgQueue.isEmpty()) {
154   - if (msgQueueProcessorLock.tryLock()) {
155   - try {
156   - TCPMessage msg;
157   - while ((msg = msgQueue.poll()) != null) {
158   - try {
159   - msgQueueSize.decrementAndGet();
160   - msgProcessor.accept(msg);
161   - } finally {
162   - ReferenceCountUtil.safeRelease(msg);
163   - }
164   - }
165   - } finally {
166   - msgQueueProcessorLock.unlock();
167   - }
168   - } else {
169   - return;
170   - }
171   - }
172   - }
173   -
174   - public int getMsgQueueSize() {
175   - return msgQueueSize.get();
176   - }
177   -
178   - public void release() {
179   - if (!msgQueue.isEmpty()) {
180   - log.warn("doDisconnect for device {} but unprocessed messages {} left in the msg queue", getDeviceId(), msgQueue.size());
181   - msgQueue.forEach(ReferenceCountUtil::safeRelease);
182   - msgQueue.clear();
183   - }
184   - }
185   -
186   -
187 134 }
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +import io.netty.channel.ChannelFuture;
  19 +import io.netty.handler.codec.mqtt.MqttMessage;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.data.DeviceProfile;
  22 +import org.thingsboard.server.common.data.id.DeviceId;
  23 +import org.thingsboard.server.common.data.rpc.RpcStatus;
  24 +import org.thingsboard.server.common.transport.SessionMsgListener;
  25 +import org.thingsboard.server.common.transport.TransportService;
  26 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  27 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  28 +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
  29 +import org.thingsboard.server.gen.transport.TransportProtos;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  31 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  32 +
  33 +import java.util.UUID;
  34 +
  35 +/**
  36 + * Created by ashvayka on 19.01.17.
  37 + */
  38 +@Slf4j
  39 +public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext implements SessionMsgListener {
  40 +
  41 + private final TcpGatewaySessionHandler parent;
  42 + private final TransportService transportService;
  43 +
  44 + public TcpGatewayDeviceSessionCtx(TcpTransportContext context, TcpGatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
  45 + DeviceProfile deviceProfile, TransportService transportService) {
  46 + super(UUID.randomUUID(),context);
  47 + this.parent = parent;
  48 + setSessionInfo(SessionInfoProto.newBuilder()
  49 + .setNodeId(parent.getNodeId())
  50 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  51 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  52 + .setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
  53 + .setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
  54 + .setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
  55 + .setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
  56 + .setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
  57 + .setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
  58 + .setDeviceName(deviceInfo.getDeviceName())
  59 + .setDeviceType(deviceInfo.getDeviceType())
  60 + .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
  61 + .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
  62 + .setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
  63 + .setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
  64 + .build());
  65 + setDeviceInfo(deviceInfo);
  66 + setConnected(true);
  67 + setDeviceProfile(deviceProfile);
  68 + this.transportService = transportService;
  69 + }
  70 +
  71 + @Override
  72 + public UUID getSessionId() {
  73 + return sessionId;
  74 + }
  75 +
  76 + @Override
  77 + public int nextMsgId() {
  78 + return parent.nextMsgId();
  79 + }
  80 +
  81 + @Override
  82 + public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
  83 + try {
  84 + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::pushDeviceMsg);
  85 + } catch (Exception e) {
  86 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  87 + }
  88 + }
  89 +
  90 + @Override
  91 + public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
  92 + log.trace("[{}] Received attributes update notification to device", sessionId);
  93 + try {
  94 + parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::pushDeviceMsg);
  95 + } catch (Exception e) {
  96 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  97 + }
  98 + }
  99 +
  100 + @Override
  101 + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
  102 + log.error("【{}】下发RPC命令【{}】给网关子设备", sessionId,request.getParams());
  103 + try {
  104 + parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
  105 + payload -> {
  106 + ChannelFuture channelFuture = parent.pushDeviceMsg(payload);
  107 + if (request.getPersisted()) {
  108 + channelFuture.addListener(result -> {
  109 + if (result.cause() == null) {
  110 + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  111 + }
  112 + });
  113 + }
  114 + }
  115 + );
  116 + } catch (Exception e) {
  117 + transportService.process(getSessionInfo(),
  118 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  119 + .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
  120 + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
  121 + }
  122 + }
  123 +
  124 + @Override
  125 + public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
  126 + log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
  127 + parent.deregisterSession(getDeviceInfo().getDeviceName());
  128 + }
  129 +
  130 + @Override
  131 + public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
  132 + // This feature is not supported in the TB IoT Gateway yet.
  133 + }
  134 +
  135 + @Override
  136 + public void onDeviceDeleted(DeviceId deviceId) {
  137 + parent.onDeviceDeleted(this.getSessionInfo().getDeviceName());
  138 + }
  139 +
  140 + private boolean isAckExpected(MqttMessage message) {
  141 + return message.fixedHeader().qosLevel().value() > 0;
  142 + }
  143 +
  144 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.transport.tcp.session;
  17 +
  18 +
  19 +import com.google.common.util.concurrent.FutureCallback;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import com.google.common.util.concurrent.SettableFuture;
  23 +import com.google.gson.*;
  24 +import com.google.protobuf.InvalidProtocolBufferException;
  25 +import com.google.protobuf.ProtocolStringList;
  26 +import io.netty.buffer.ByteBuf;
  27 +import io.netty.buffer.Unpooled;
  28 +import io.netty.channel.ChannelFuture;
  29 +import io.netty.channel.ChannelHandlerContext;
  30 +import io.netty.handler.codec.mqtt.MqttPublishMessage;
  31 +import lombok.extern.slf4j.Slf4j;
  32 +import org.springframework.util.CollectionUtils;
  33 +import org.springframework.util.ConcurrentReferenceHashMap;
  34 +import org.springframework.util.StringUtils;
  35 +import org.thingsboard.server.common.data.id.DeviceId;
  36 +import org.thingsboard.server.common.data.yunteng.enums.TcpDataTypeEnum;
  37 +import org.thingsboard.server.common.transport.TransportService;
  38 +import org.thingsboard.server.common.transport.TransportServiceCallback;
  39 +import org.thingsboard.server.common.transport.adaptor.AdaptorException;
  40 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  41 +import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
  42 +import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
  43 +import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
  44 +import org.thingsboard.server.gen.transport.TransportApiProtos;
  45 +import org.thingsboard.server.gen.transport.TransportProtos;
  46 +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
  47 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  48 +import org.thingsboard.server.transport.tcp.TcpTransportContext;
  49 +import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
  50 +import org.thingsboard.server.transport.tcp.util.ByteUtils;
  51 +
  52 +import javax.annotation.Nullable;
  53 +import java.io.UnsupportedEncodingException;
  54 +import java.util.*;
  55 +import java.util.concurrent.ConcurrentHashMap;
  56 +import java.util.concurrent.ConcurrentMap;
  57 +import java.util.concurrent.locks.Lock;
  58 +import java.util.concurrent.locks.ReentrantLock;
  59 +
  60 +import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType;
  61 +import static org.thingsboard.server.common.transport.service.DefaultTransportService.*;
  62 +
  63 +/**
  64 + * Created by ashvayka on 19.01.17.
  65 + */
  66 +@Slf4j
  67 +public class TcpGatewaySessionHandler {
  68 +
  69 + private static final String DEFAULT_DEVICE_TYPE = "default";
  70 + private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
  71 + private static final String DEVICE_PROPERTY = "device";
  72 +
  73 + private final TcpTransportContext context;
  74 + private final TransportService transportService;
  75 + private final TransportDeviceInfo gateway;
  76 + private final UUID sessionId;
  77 + private final ConcurrentMap<String, Lock> deviceCreationLockMap;
  78 + private final ConcurrentMap<String, TcpGatewayDeviceSessionCtx> devices;
  79 + private final ConcurrentMap<String, ListenableFuture<TcpGatewayDeviceSessionCtx>> deviceFutures;
  80 + private final ChannelHandlerContext channel;
  81 + private final TcpDeviceSessionCtx deviceSessionCtx;
  82 +
  83 + public TcpGatewaySessionHandler(TcpDeviceSessionCtx deviceSessionCtx, UUID sessionId) {
  84 + this.context = deviceSessionCtx.getContext();
  85 + this.transportService = context.getTransportService();
  86 + this.deviceSessionCtx = deviceSessionCtx;
  87 + this.gateway = deviceSessionCtx.getDeviceInfo();
  88 + this.sessionId = sessionId;
  89 + this.devices = new ConcurrentHashMap<>();
  90 + this.deviceFutures = new ConcurrentHashMap<>();
  91 + this.deviceCreationLockMap = createWeakMap();
  92 + this.channel = deviceSessionCtx.getChannel();
  93 + }
  94 +
  95 + ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
  96 + return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK);
  97 + }
  98 +
  99 +
  100 +
  101 +
  102 +
  103 + public void onDeviceTelemetry(TCPMessage tcpMessage) throws AdaptorException {
  104 + Futures.addCallback(checkDeviceConnected(tcpMessage.getDeviceCode()),
  105 + new FutureCallback<>() {
  106 + @Override
  107 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  108 + String deviceName = deviceCtx.getDeviceInfo().getDeviceName();
  109 + try {
  110 + TransportProtos.PostTelemetryMsg postTelemetryMsg = deviceCtx.getPayloadAdaptor().convertToPostTelemetry(deviceCtx,tcpMessage.getMessage());
  111 + processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, tcpMessage.getRequestId());
  112 + } catch (Throwable e) {
  113 + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, tcpMessage.getMessage(), e);
  114 + channel.close();
  115 + }
  116 + }
  117 +
  118 + @Override
  119 + public void onFailure(Throwable t) {
  120 + log.debug("[{}] Failed to process device telemetry command: {}", sessionId, tcpMessage.getDeviceCode(), t);
  121 + }
  122 + }, context.getExecutor());
  123 + }
  124 +
  125 +
  126 +
  127 +
  128 +
  129 +
  130 +
  131 + public void onGatewayDisconnect() {
  132 + devices.forEach(this::deregisterSession);
  133 + }
  134 +
  135 + public void onDeviceDeleted(String deviceName) {
  136 + deregisterSession(deviceName);
  137 + }
  138 +
  139 + public String getNodeId() {
  140 + return context.getNodeId();
  141 + }
  142 +
  143 + public UUID getSessionId() {
  144 + return sessionId;
  145 + }
  146 +
  147 + public TcpTransportAdaptor getPayloadAdaptor() {
  148 + return deviceSessionCtx.getPayloadAdaptor();
  149 + }
  150 +
  151 + void deregisterSession(String deviceName) {
  152 + TcpGatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
  153 + if (deviceSessionCtx != null) {
  154 + deregisterSession(deviceName, deviceSessionCtx);
  155 + } else {
  156 + log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
  157 + }
  158 + }
  159 +
  160 +
  161 +
  162 + int nextMsgId() {
  163 + return deviceSessionCtx.nextMsgId();
  164 + }
  165 +
  166 + private boolean isJsonPayloadType() {
  167 + return true;//deviceSessionCtx.isJsonPayloadType();
  168 + }
  169 +
  170 + private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
  171 + log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
  172 + Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  173 + @Override
  174 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx result) {
  175 + ack(msg);
  176 + log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
  177 + }
  178 +
  179 + @Override
  180 + public void onFailure(Throwable t) {
  181 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
  182 +
  183 + }
  184 + }, context.getExecutor());
  185 + }
  186 +
  187 + private ListenableFuture<TcpGatewayDeviceSessionCtx> onDeviceConnect(String deviceCode, String deviceType) {
  188 + TcpGatewayDeviceSessionCtx result = devices.get(deviceCode);
  189 + if (result == null) {
  190 + Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceCode, s -> new ReentrantLock());
  191 + deviceCreationLock.lock();
  192 + try {
  193 + result = devices.get(deviceCode);
  194 + if (result == null) {
  195 + return getDeviceCreationFuture(deviceCode, deviceType);
  196 + } else {
  197 + return Futures.immediateFuture(result);
  198 + }
  199 + } finally {
  200 + deviceCreationLock.unlock();
  201 + }
  202 + } else {
  203 + return Futures.immediateFuture(result);
  204 + }
  205 + }
  206 +
  207 + private ListenableFuture<TcpGatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
  208 + final SettableFuture<TcpGatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
  209 + ListenableFuture<TcpGatewayDeviceSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
  210 + if (future != null) {
  211 + return future;
  212 + }
  213 + try {
  214 + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
  215 + .setDeviceName(deviceName)
  216 + .setDeviceType(deviceType)
  217 + .setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
  218 + .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()).build(),
  219 + new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
  220 + @Override
  221 + public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
  222 + TcpGatewayDeviceSessionCtx deviceSessionCtx = new TcpGatewayDeviceSessionCtx(context,TcpGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), transportService);
  223 + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
  224 + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
  225 + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
  226 + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
  227 + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
  228 + .setSessionInfo(deviceSessionInfo)
  229 + .setSessionEvent(SESSION_EVENT_MSG_OPEN)
  230 + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
  231 + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
  232 + .build(), null);
  233 + }
  234 + futureToSet.set(devices.get(deviceName));
  235 + deviceFutures.remove(deviceName);
  236 + }
  237 +
  238 + @Override
  239 + public void onError(Throwable e) {
  240 + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
  241 + futureToSet.setException(e);
  242 + deviceFutures.remove(deviceName);
  243 + }
  244 + });
  245 + return futureToSet;
  246 + } catch (Throwable e) {
  247 + deviceFutures.remove(deviceName);
  248 + throw e;
  249 + }
  250 + }
  251 +
  252 + private int getMsgId(MqttPublishMessage mqttMsg) {
  253 + return mqttMsg.variableHeader().packetId();
  254 + }
  255 +
  256 + public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
  257 + JsonElement json = getJson(mqttMsg);
  258 + String deviceName = checkDeviceName(getDeviceName(json));
  259 + String deviceType = getDeviceType(json);
  260 + processOnConnect(mqttMsg, deviceName, deviceType);
  261 + }
  262 +
  263 +
  264 +
  265 + public void onDeviceDisconnect(MqttPublishMessage mqttMsg) throws AdaptorException {
  266 + String deviceName = checkDeviceName(getDeviceName(getJson(mqttMsg)));
  267 + processOnDisconnect(mqttMsg, deviceName);
  268 + }
  269 +
  270 +
  271 +
  272 + private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
  273 + deregisterSession(deviceName);
  274 + ack(msg);
  275 + }
  276 +
  277 +
  278 +
  279 +
  280 + private void processPostTelemetryMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, String msgId) {
  281 + transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
  282 + }
  283 + public void onDeviceClaim(MqttPublishMessage mqttMsg) throws AdaptorException {
  284 + int msgId = getMsgId(mqttMsg);
  285 + ByteBuf payload = mqttMsg.payload();
  286 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  287 + if (json.isJsonObject()) {
  288 + JsonObject jsonObj = json.getAsJsonObject();
  289 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  290 + String deviceName = deviceEntry.getKey();
  291 + Futures.addCallback(checkDeviceConnected(deviceName),
  292 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  293 + @Override
  294 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  295 + if (!deviceEntry.getValue().isJsonObject()) {
  296 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  297 + }
  298 + try {
  299 + DeviceId deviceId = deviceCtx.getDeviceId();
  300 + TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
  301 + processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
  302 + } catch (Throwable e) {
  303 + log.warn("[{}][{}] Failed to convert claim message: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
  304 + }
  305 + }
  306 +
  307 + @Override
  308 + public void onFailure(Throwable t) {
  309 + log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t);
  310 + }
  311 + }, context.getExecutor());
  312 + }
  313 + } else {
  314 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  315 + }
  316 + }
  317 +
  318 +
  319 + private void processClaimDeviceMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
  320 + transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId+"", claimDeviceMsg));
  321 + }
  322 +
  323 + public void onDeviceAttributes(TCPMessage mqttMsg) throws AdaptorException {
  324 + int msgId = 0;//getMsgId(mqttMsg);
  325 + ByteBuf payload = null;//mqttMsg.payload();
  326 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  327 + if (json.isJsonObject()) {
  328 + JsonObject jsonObj = json.getAsJsonObject();
  329 + for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
  330 + String deviceName = deviceEntry.getKey();
  331 + Futures.addCallback(checkDeviceConnected(deviceName),
  332 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  333 + @Override
  334 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  335 + if (!deviceEntry.getValue().isJsonObject()) {
  336 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  337 + }
  338 + TransportProtos.PostAttributeMsg postAttributeMsg = JsonConverter.convertToAttributesProto(deviceEntry.getValue().getAsJsonObject());
  339 + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
  340 + }
  341 +
  342 + @Override
  343 + public void onFailure(Throwable t) {
  344 + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
  345 + }
  346 + }, context.getExecutor());
  347 + }
  348 + } else {
  349 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  350 + }
  351 + }
  352 +
  353 +
  354 + private void processPostAttributesMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
  355 + transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId+"", postAttributeMsg));
  356 + }
  357 +
  358 + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException {
  359 + JsonElement json = null;//JsonMqttAdaptor.validateJsonPayload(sessionId, msg.payload());
  360 + if (json.isJsonObject()) {
  361 + JsonObject jsonObj = json.getAsJsonObject();
  362 + int requestId = jsonObj.get("id").getAsInt();
  363 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  364 + boolean clientScope = jsonObj.get("client").getAsBoolean();
  365 + Set<String> keys;
  366 + if (jsonObj.has("key")) {
  367 + keys = Collections.singleton(jsonObj.get("key").getAsString());
  368 + } else {
  369 + JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
  370 + keys = new HashSet<>();
  371 + for (JsonElement keyObj : keysArray) {
  372 + keys.add(keyObj.getAsString());
  373 + }
  374 + }
  375 + TransportProtos.GetAttributeRequestMsg requestMsg = toGetAttributeRequestMsg(requestId, clientScope, keys);
  376 + processGetAttributeRequestMessage(mqttMsg, deviceName, requestMsg);
  377 + } else {
  378 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  379 + }
  380 + }
  381 +
  382 +
  383 + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
  384 + int msgId = getMsgId(mqttMsg);
  385 + ByteBuf payload = mqttMsg.payload();
  386 + JsonElement json = null;// JsonMqttAdaptor.validateJsonPayload(sessionId, payload);
  387 + if (json.isJsonObject()) {
  388 + JsonObject jsonObj = json.getAsJsonObject();
  389 + String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
  390 + Futures.addCallback(checkDeviceConnected(deviceName),
  391 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  392 + @Override
  393 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  394 + Integer requestId = jsonObj.get("id").getAsInt();
  395 + String data = jsonObj.get("data").toString();
  396 + TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  397 + .setRequestId(requestId).setPayload(data).build();
  398 + processRpcResponseMsg(deviceCtx, rpcResponseMsg, deviceName, msgId);
  399 + }
  400 +
  401 + @Override
  402 + public void onFailure(Throwable t) {
  403 + log.debug("[{}] Failed to process device Rpc response command: {}", sessionId, deviceName, t);
  404 + }
  405 + }, context.getExecutor());
  406 + } else {
  407 + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
  408 + }
  409 + }
  410 +
  411 +
  412 + private void processRpcResponseMsg(TcpGatewayDeviceSessionCtx deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
  413 + transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId+"", rpcResponseMsg));
  414 + }
  415 +
  416 + private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
  417 + int msgId = getMsgId(mqttMsg);
  418 + Futures.addCallback(checkDeviceConnected(deviceName),
  419 + new FutureCallback<TcpGatewayDeviceSessionCtx>() {
  420 + @Override
  421 + public void onSuccess(@Nullable TcpGatewayDeviceSessionCtx deviceCtx) {
  422 + transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId+"", requestMsg));
  423 + }
  424 +
  425 + @Override
  426 + public void onFailure(Throwable t) {
  427 + ack(mqttMsg);
  428 + log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t);
  429 + }
  430 + }, context.getExecutor());
  431 + }
  432 +
  433 + private TransportProtos.GetAttributeRequestMsg toGetAttributeRequestMsg(int requestId, boolean clientScope, Set<String> keys) {
  434 + TransportProtos.GetAttributeRequestMsg.Builder result = TransportProtos.GetAttributeRequestMsg.newBuilder();
  435 + result.setRequestId(requestId);
  436 +
  437 + if (clientScope) {
  438 + result.addAllClientAttributeNames(keys);
  439 + } else {
  440 + result.addAllSharedAttributeNames(keys);
  441 + }
  442 + return result.build();
  443 + }
  444 +
  445 + private ListenableFuture<TcpGatewayDeviceSessionCtx> checkDeviceConnected(String deviceCode) {
  446 + TcpGatewayDeviceSessionCtx ctx = devices.get(deviceCode);
  447 + if (ctx == null) {
  448 + log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceCode);
  449 + return onDeviceConnect(deviceCode, DEFAULT_DEVICE_TYPE);
  450 + } else {
  451 + return Futures.immediateFuture(ctx);
  452 + }
  453 + }
  454 +
  455 + private String checkDeviceName(String deviceName) {
  456 + if (StringUtils.isEmpty(deviceName)) {
  457 + throw new RuntimeException("Device name is empty!");
  458 + } else {
  459 + return deviceName;
  460 + }
  461 + }
  462 +
  463 + private String getDeviceName(JsonElement json) {
  464 + return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
  465 + }
  466 +
  467 + private String getDeviceType(JsonElement json) {
  468 + JsonElement type = json.getAsJsonObject().get("type");
  469 + return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
  470 + }
  471 +
  472 + private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
  473 + return null;//JsonMqttAdaptor.validateJsonPayload(sessionId, mqttMsg.payload());
  474 + }
  475 +
  476 +
  477 +
  478 +
  479 + private void deregisterSession(String deviceName, TcpGatewayDeviceSessionCtx deviceSessionCtx) {
  480 + transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
  481 + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
  482 + log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
  483 + }
  484 +
  485 + private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final String msgId, final T msg) {
  486 + return new TransportServiceCallback<Void>() {
  487 + @Override
  488 + public void onSuccess(Void dummy) {
  489 + log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg);
  490 + if(!StringUtils.isEmpty(msgId)){
  491 + pushDeviceMsg(new TCPMessage(msgId));
  492 + }
  493 + }
  494 +
  495 + @Override
  496 + public void onError(Throwable e) {
  497 + log.trace("[{}] Failed to publish msg: {} for device: {}", sessionId, msg, deviceName, e);
  498 + ctx.close();
  499 + }
  500 + };
  501 + }
  502 + /**
  503 + * 往设备推送消息
  504 + * @param tcp
  505 + * @return
  506 + */
  507 + ChannelFuture pushDeviceMsg(TCPMessage tcp) {
  508 + try {
  509 + String message = tcp.getMessage();
  510 + byte[] payloadInBytes ;
  511 + if(deviceSessionCtx.getPayloadType().equals(TcpDataTypeEnum.HEX)){
  512 + payloadInBytes = ByteUtils.hexStr2Bytes(message);
  513 + }else{
  514 + payloadInBytes = message.getBytes(ByteUtils.UTF_8);
  515 + }
  516 +// ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
  517 +// ByteBuf payload = ALLOCATOR.buffer();
  518 +// payload.writeBytes(payloadInBytes);
  519 + ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);
  520 +
  521 + return channel.writeAndFlush(payload);
  522 + } catch (UnsupportedEncodingException e) {
  523 + log.error(e.getMessage(),e);
  524 + throw new RuntimeException(e);
  525 + }
  526 + }
  527 + private void ack(MqttPublishMessage msg) {
  528 + int msgId = getMsgId(msg);
  529 + if (msgId > 0) {
  530 + channel.writeAndFlush(msg);
  531 + }
  532 + }
  533 +}
  534 +
... ...