Commit 440d0ac730d3e858e060b33247055b6002d9a010

Authored by xp.Huang
2 parents 256cdab4 cab252b2

Merge branch '20230424' into 'master_dev'

fix: 命令下发响应功能

See merge request yunteng/thingskit!181
@@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.DataConstants; @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.DataConstants;
38 import org.thingsboard.server.common.data.Device; 38 import org.thingsboard.server.common.data.Device;
39 import org.thingsboard.server.common.data.DeviceProfile; 39 import org.thingsboard.server.common.data.DeviceProfile;
40 import org.thingsboard.server.common.data.DeviceTransportType; 40 import org.thingsboard.server.common.data.DeviceTransportType;
  41 +import org.thingsboard.server.common.data.device.profile.MqttTopics;
41 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration; 42 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
42 import org.thingsboard.server.common.data.id.DeviceId; 43 import org.thingsboard.server.common.data.id.DeviceId;
43 import org.thingsboard.server.common.data.rpc.RpcStatus; 44 import org.thingsboard.server.common.data.rpc.RpcStatus;
@@ -56,6 +57,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData; @@ -56,6 +57,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
56 import org.thingsboard.server.gen.transport.TransportProtos; 57 import org.thingsboard.server.gen.transport.TransportProtos;
57 import org.thingsboard.server.gen.transport.TransportProtos.ScriptProto; 58 import org.thingsboard.server.gen.transport.TransportProtos.ScriptProto;
58 import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry; 59 import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
  60 +import org.thingsboard.server.transport.tcp.adaptors.TcpDownEntry;
59 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor; 61 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
60 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry; 62 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
61 import org.thingsboard.server.transport.tcp.script.TkScriptFactory; 63 import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
@@ -102,9 +104,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -102,9 +104,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
102 volatile InetSocketAddress address; 104 volatile InetSocketAddress address;
103 105
104 volatile TcpGatewaySessionHandler gatewaySessionHandler; 106 volatile TcpGatewaySessionHandler gatewaySessionHandler;
105 - private final ConcurrentHashMap<String, String> otaPackSessions;  
106 - private final ConcurrentHashMap<String, Integer> chunkSizes;  
107 - private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck; 107 +
  108 +
108 private final ConcurrentMap<UUID, String> authScripts; 109 private final ConcurrentMap<UUID, String> authScripts;
109 private final AtomicInteger authedCounter = new AtomicInteger(); 110 private final AtomicInteger authedCounter = new AtomicInteger();
110 111
@@ -115,9 +116,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -115,9 +116,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
115 this.transportService = context.getTransportService(); 116 this.transportService = context.getTransportService();
116 this.sslHandler = sslHandler; 117 this.sslHandler = sslHandler;
117 this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context); 118 this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
118 - this.otaPackSessions = new ConcurrentHashMap<>();  
119 - this.chunkSizes = new ConcurrentHashMap<>();  
120 - this.rpcAwaitingAck = new ConcurrentHashMap<>();  
121 this.authScripts = new ConcurrentHashMap<>(); 119 this.authScripts = new ConcurrentHashMap<>();
122 List<ScriptProto> authScripts = this.transportService.getScripts(ScriptProto.newBuilder().setFunctionType(TkScriptFunctionType.TRANSPORT_TCP_AUTH.ordinal()).build()); 120 List<ScriptProto> authScripts = this.transportService.getScripts(ScriptProto.newBuilder().setFunctionType(TkScriptFunctionType.TRANSPORT_TCP_AUTH.ordinal()).build());
123 authScripts.stream().forEach(i -> { 121 authScripts.stream().forEach(i -> {
@@ -253,7 +251,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -253,7 +251,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
253 if (tcpMessage.getTelemetry()) { 251 if (tcpMessage.getTelemetry()) {
254 gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString()); 252 gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
255 } else { 253 } else {
256 - gatewaySessionHandler.onDeviceAttributes(devName, tcpMessage.getRequestId(), param.toString()); 254 +// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
257 } 255 }
258 256
259 }); 257 });
@@ -278,8 +276,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -278,8 +276,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
278 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr); 276 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
279 transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage)); 277 transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
280 } else { 278 } else {
281 - TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, dataStr);  
282 - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, tcpMessage)); 279 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  280 + transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
283 } 281 }
284 } else { 282 } else {
285 transportService.reportActivity(deviceSessionCtx.getSessionInfo()); 283 transportService.reportActivity(deviceSessionCtx.getSessionInfo());
@@ -345,7 +343,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -345,7 +343,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
345 if (null != accessToken.getUserName()) { 343 if (null != accessToken.getUserName()) {
346 } 344 }
347 String token = accessToken.getPassword(); 345 String token = accessToken.getPassword();
348 - if (null == token || Pattern.compile(".*\\s+.*").matcher(token).matches()) { 346 + if (null == token
  347 + || StringUtils.isEmpty(token)
  348 + || Pattern.compile(".*[\\s\u0000]+.*").matcher(token).matches()) {
349 onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID); 349 onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
350 return; 350 return;
351 } 351 }
@@ -522,25 +522,11 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -522,25 +522,11 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
522 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor(); 522 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
523 try { 523 try {
524 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { 524 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
525 - int msgId = rpcRequest.getRequestId();  
526 - boolean oneWay = rpcRequest.getOneway();  
527 - if (!oneWay) {  
528 - rpcAwaitingAck.put(msgId, rpcRequest);  
529 - context.getScheduler().schedule(() -> {  
530 - TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);  
531 - if (msg != null) {  
532 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);  
533 - }  
534 - }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);  
535 - } 525 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(),rpcRequest);
536 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas()); 526 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas());
537 cf.addListener(result -> { 527 cf.addListener(result -> {
538 if (result.cause() == null) { 528 if (result.cause() == null) {
539 - if (oneWay) {  
540 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);  
541 - } else if (rpcRequest.getPersisted()) {  
542 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);  
543 - } 529 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
544 } else { 530 } else {
545 // TODO: send error 531 // TODO: send error
546 } 532 }
@@ -554,6 +540,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements @@ -554,6 +540,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
554 } 540 }
555 } 541 }
556 542
  543 +
557 @Override 544 @Override
558 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) { 545 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
559 log.debug("[{}] 服务端响应设备的RPC请求", sessionId); 546 log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
@@ -99,8 +99,18 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor { @@ -99,8 +99,18 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
99 } 99 }
100 100
101 @Override 101 @Override
102 - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String inbound, String topicBase) throws AdaptorException {  
103 - return processToDeviceRpcResponseMsg(inbound, topicBase); 102 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, TcpUpEntry inbound) throws AdaptorException {
  103 + try {
  104 + Integer requestId = ctx.getRpcRequesting(inbound.getIdentifier());
  105 + if(requestId != null){
  106 + String payload = JacksonUtil.toString(inbound.getDatas());
  107 + return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
  108 + }
  109 + return null;
  110 + } catch (RuntimeException e) {
  111 + log.debug("Failed to decode rpc response", e);
  112 + throw new AdaptorException(e);
  113 + }
104 } 114 }
105 115
106 @Override 116 @Override
@@ -134,6 +144,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor { @@ -134,6 +144,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
134 } 144 }
135 TcpDownEntry data = new TcpDownEntry(); 145 TcpDownEntry data = new TcpDownEntry();
136 data.setDatas(payload); 146 data.setDatas(payload);
  147 + data.setIdentifier(payload);
137 return Optional.of(data); 148 return Optional.of(data);
138 } 149 }
139 150
@@ -189,18 +200,6 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor { @@ -189,18 +200,6 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
189 } 200 }
190 } 201 }
191 202
192 - private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(String inbound, String topicBase) throws AdaptorException {  
193 -// String topicName = inbound.variableHeader().topicName();  
194 -// try {  
195 -// int requestId = getRequestId(topicName, topicBase);  
196 -// String payload = inbound.payload().toString(UTF8);  
197 -// return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();  
198 -// } catch (RuntimeException e) {  
199 -// log.debug("Failed to decode rpc response", e);  
200 -// throw new AdaptorException(e);  
201 -// }  
202 - return null;  
203 - }  
204 203
205 private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException { 204 private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
206 String topicName = inbound.variableHeader().topicName(); 205 String topicName = inbound.variableHeader().topicName();
@@ -18,9 +18,9 @@ public class TcpDownEntry implements Serializable { @@ -18,9 +18,9 @@ public class TcpDownEntry implements Serializable {
18 */ 18 */
19 private UUID requestId; 19 private UUID requestId;
20 /** 20 /**
21 - * 设备名称、设备标识 21 + * 数据唯一标识
22 */ 22 */
23 - private String deviceName; 23 + private String identifier;
24 /** 24 /**
25 * 下发给设备的最终内容 25 * 下发给设备的最终内容
26 */ 26 */
@@ -53,7 +53,7 @@ public interface TcpTransportAdaptor { @@ -53,7 +53,7 @@ public interface TcpTransportAdaptor {
53 53
54 GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException; 54 GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
55 55
56 - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String mqttMsg, String topicBase) throws AdaptorException; 56 + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, TcpUpEntry inbound) throws AdaptorException;
57 57
58 ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException; 58 ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
59 59
@@ -33,9 +33,9 @@ public class TcpUpEntry implements Serializable { @@ -33,9 +33,9 @@ public class TcpUpEntry implements Serializable {
33 */ 33 */
34 private String ackMsg; 34 private String ackMsg;
35 /** 35 /**
36 - * 设备名称、设备标识 36 + * 数据唯一标识
37 */ 37 */
38 - private String deviceName; 38 + private String identifier;
39 /** 39 /**
40 * 指标采集时间 40 * 指标采集时间
41 */ 41 */
@@ -20,6 +20,9 @@ import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor; @@ -20,6 +20,9 @@ import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
20 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry; 20 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
21 21
22 import java.util.UUID; 22 import java.util.UUID;
  23 +import java.util.concurrent.ConcurrentHashMap;
  24 +import java.util.concurrent.ConcurrentMap;
  25 +import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer; 26 import java.util.function.Consumer;
24 27
25 /** 28 /**
@@ -36,7 +39,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -36,7 +39,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
36 private volatile UUID telemetryScriptId; 39 private volatile UUID telemetryScriptId;
37 @Getter 40 @Getter
38 private volatile UUID rpcScriptId; 41 private volatile UUID rpcScriptId;
39 - 42 + private final ConcurrentMap<String, Integer> rpcAwaitingAck;
40 /** 43 /**
41 * 设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符 44 * 设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符
42 */ 45 */
@@ -48,6 +51,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -48,6 +51,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
48 super(sessionId); 51 super(sessionId);
49 this.context = context; 52 this.context = context;
50 this.adaptor = context.getJsonTcpAdaptor(); 53 this.adaptor = context.getJsonTcpAdaptor();
  54 + this.rpcAwaitingAck = new ConcurrentHashMap<>();
51 } 55 }
52 56
53 57
@@ -138,4 +142,21 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont @@ -138,4 +142,21 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
138 }, MoreExecutors.directExecutor()); 142 }, MoreExecutors.directExecutor());
139 } 143 }
140 144
  145 + /**
  146 + * 管理RPC请求信息
  147 + * @param rpcRequest 请求数据
  148 + * @param identifier 请求唯一标识
  149 + */
  150 + public void rpcRequesting(String identifier,TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  151 + boolean oneWay = rpcRequest.getOneway();
  152 + if (!oneWay) {
  153 + rpcAwaitingAck.put(identifier, rpcRequest.getRequestId());
  154 + context.getScheduler().schedule(() -> {
  155 + rpcAwaitingAck.remove(identifier);
  156 + }, Math.max(0, Math.min(getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
  157 + }
  158 + }
  159 + public Integer getRpcRequesting(String identifier){
  160 + return rpcAwaitingAck.remove(identifier);
  161 + }
141 } 162 }
@@ -30,6 +30,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; @@ -30,6 +30,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
30 import org.thingsboard.server.transport.tcp.TcpTransportContext; 30 import org.thingsboard.server.transport.tcp.TcpTransportContext;
31 31
32 import java.util.UUID; 32 import java.util.UUID;
  33 +import java.util.concurrent.TimeUnit;
33 34
34 /** 35 /**
35 * Created by ashvayka on 19.01.17. 36 * Created by ashvayka on 19.01.17.
@@ -93,15 +94,12 @@ public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext impl @@ -93,15 +94,12 @@ public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext impl
93 try { 94 try {
94 parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent( 95 parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
95 payload -> { 96 payload -> {
96 - boolean oneWay = request.getOneway(); 97 + rpcRequesting(payload.getIdentifier(),request);
97 ChannelFuture channelFuture = parent.pushDeviceMsg(payload.getDatas()); 98 ChannelFuture channelFuture = parent.pushDeviceMsg(payload.getDatas());
98 if (request.getPersisted()) { 99 if (request.getPersisted()) {
99 channelFuture.addListener(result -> { 100 channelFuture.addListener(result -> {
100 - if (oneWay) { 101 + if (result.cause() == null) {
101 transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); 102 transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
102 - } else if (request.getPersisted()) {  
103 - transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);  
104 -  
105 } 103 }
106 }); 104 });
107 } 105 }