Commit cab252b232c09ebd32bc412b516c442f22443eba

Authored by 芯火源
1 parent 256cdab4

fix: 命令下发响应功能

1、TCP命令下发响应功能调整
2、TCP上下行脚本标识deviceName改为identifier
3、加强鉴权脚本返回的访问令牌有效性校验
... ... @@ -38,6 +38,7 @@ import org.thingsboard.server.common.data.DataConstants;
38 38 import org.thingsboard.server.common.data.Device;
39 39 import org.thingsboard.server.common.data.DeviceProfile;
40 40 import org.thingsboard.server.common.data.DeviceTransportType;
  41 +import org.thingsboard.server.common.data.device.profile.MqttTopics;
41 42 import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration;
42 43 import org.thingsboard.server.common.data.id.DeviceId;
43 44 import org.thingsboard.server.common.data.rpc.RpcStatus;
... ... @@ -56,6 +57,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
56 57 import org.thingsboard.server.gen.transport.TransportProtos;
57 58 import org.thingsboard.server.gen.transport.TransportProtos.ScriptProto;
58 59 import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry;
  60 +import org.thingsboard.server.transport.tcp.adaptors.TcpDownEntry;
59 61 import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
60 62 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
61 63 import org.thingsboard.server.transport.tcp.script.TkScriptFactory;
... ... @@ -102,9 +104,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
102 104 volatile InetSocketAddress address;
103 105
104 106 volatile TcpGatewaySessionHandler gatewaySessionHandler;
105   - private final ConcurrentHashMap<String, String> otaPackSessions;
106   - private final ConcurrentHashMap<String, Integer> chunkSizes;
107   - private final ConcurrentMap<Integer, TransportProtos.ToDeviceRpcRequestMsg> rpcAwaitingAck;
  107 +
  108 +
108 109 private final ConcurrentMap<UUID, String> authScripts;
109 110 private final AtomicInteger authedCounter = new AtomicInteger();
110 111
... ... @@ -115,9 +116,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
115 116 this.transportService = context.getTransportService();
116 117 this.sslHandler = sslHandler;
117 118 this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context);
118   - this.otaPackSessions = new ConcurrentHashMap<>();
119   - this.chunkSizes = new ConcurrentHashMap<>();
120   - this.rpcAwaitingAck = new ConcurrentHashMap<>();
121 119 this.authScripts = new ConcurrentHashMap<>();
122 120 List<ScriptProto> authScripts = this.transportService.getScripts(ScriptProto.newBuilder().setFunctionType(TkScriptFunctionType.TRANSPORT_TCP_AUTH.ordinal()).build());
123 121 authScripts.stream().forEach(i -> {
... ... @@ -253,7 +251,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
253 251 if (tcpMessage.getTelemetry()) {
254 252 gatewaySessionHandler.onDeviceTelemetry(devName, tcpMessage.getRequestId(), param.toString());
255 253 } else {
256   - gatewaySessionHandler.onDeviceAttributes(devName, tcpMessage.getRequestId(), param.toString());
  254 +// gatewaySessionHandler.onDeviceRpcResponse(devName, tcpMessage.getRequestId(), param.toString());
257 255 }
258 256
259 257 });
... ... @@ -278,8 +276,8 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
278 276 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
279 277 transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, tcpMessage));
280 278 } else {
281   - TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, dataStr);
282   - transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, tcpMessage));
  279 + TransportProtos.ToDeviceRpcResponseMsg postRpcMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, tcpMessage);
  280 + transportService.process(deviceSessionCtx.getSessionInfo(), postRpcMsg, getPubAckCallback(ctx, tcpMessage));
283 281 }
284 282 } else {
285 283 transportService.reportActivity(deviceSessionCtx.getSessionInfo());
... ... @@ -345,7 +343,9 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
345 343 if (null != accessToken.getUserName()) {
346 344 }
347 345 String token = accessToken.getPassword();
348   - if (null == token || Pattern.compile(".*\\s+.*").matcher(token).matches()) {
  346 + if (null == token
  347 + || StringUtils.isEmpty(token)
  348 + || Pattern.compile(".*[\\s\u0000]+.*").matcher(token).matches()) {
349 349 onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
350 350 return;
351 351 }
... ... @@ -522,25 +522,11 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
522 522 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
523 523 try {
524 524 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
525   - int msgId = rpcRequest.getRequestId();
526   - boolean oneWay = rpcRequest.getOneway();
527   - if (!oneWay) {
528   - rpcAwaitingAck.put(msgId, rpcRequest);
529   - context.getScheduler().schedule(() -> {
530   - TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
531   - if (msg != null) {
532   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
533   - }
534   - }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
535   - }
  525 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(),rpcRequest);
536 526 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas());
537 527 cf.addListener(result -> {
538 528 if (result.cause() == null) {
539   - if (oneWay) {
540   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
541   - } else if (rpcRequest.getPersisted()) {
542   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
543   - }
  529 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
544 530 } else {
545 531 // TODO: send error
546 532 }
... ... @@ -554,6 +540,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
554 540 }
555 541 }
556 542
  543 +
557 544 @Override
558 545 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
559 546 log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
... ...
... ... @@ -99,8 +99,18 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
99 99 }
100 100
101 101 @Override
102   - public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String inbound, String topicBase) throws AdaptorException {
103   - return processToDeviceRpcResponseMsg(inbound, topicBase);
  102 + public TransportProtos.ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, TcpUpEntry inbound) throws AdaptorException {
  103 + try {
  104 + Integer requestId = ctx.getRpcRequesting(inbound.getIdentifier());
  105 + if(requestId != null){
  106 + String payload = JacksonUtil.toString(inbound.getDatas());
  107 + return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
  108 + }
  109 + return null;
  110 + } catch (RuntimeException e) {
  111 + log.debug("Failed to decode rpc response", e);
  112 + throw new AdaptorException(e);
  113 + }
104 114 }
105 115
106 116 @Override
... ... @@ -134,6 +144,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
134 144 }
135 145 TcpDownEntry data = new TcpDownEntry();
136 146 data.setDatas(payload);
  147 + data.setIdentifier(payload);
137 148 return Optional.of(data);
138 149 }
139 150
... ... @@ -189,18 +200,6 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
189 200 }
190 201 }
191 202
192   - private TransportProtos.ToDeviceRpcResponseMsg processToDeviceRpcResponseMsg(String inbound, String topicBase) throws AdaptorException {
193   -// String topicName = inbound.variableHeader().topicName();
194   -// try {
195   -// int requestId = getRequestId(topicName, topicBase);
196   -// String payload = inbound.payload().toString(UTF8);
197   -// return TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build();
198   -// } catch (RuntimeException e) {
199   -// log.debug("Failed to decode rpc response", e);
200   -// throw new AdaptorException(e);
201   -// }
202   - return null;
203   - }
204 203
205 204 private TransportProtos.ToServerRpcRequestMsg processToServerRpcRequestMsg(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException {
206 205 String topicName = inbound.variableHeader().topicName();
... ...
... ... @@ -18,9 +18,9 @@ public class TcpDownEntry implements Serializable {
18 18 */
19 19 private UUID requestId;
20 20 /**
21   - * 设备名称、设备标识
  21 + * 数据唯一标识
22 22 */
23   - private String deviceName;
  23 + private String identifier;
24 24 /**
25 25 * 下发给设备的最终内容
26 26 */
... ...
... ... @@ -53,7 +53,7 @@ public interface TcpTransportAdaptor {
53 53
54 54 GetAttributeRequestMsg convertToGetAttributes(TcpDeviceWareSessionContext ctx, MqttPublishMessage inbound, String topicBase) throws AdaptorException;
55 55
56   - ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, String mqttMsg, String topicBase) throws AdaptorException;
  56 + ToDeviceRpcResponseMsg convertToDeviceRpcResponse(TcpDeviceWareSessionContext ctx, TcpUpEntry inbound) throws AdaptorException;
57 57
58 58 ToServerRpcRequestMsg convertToServerRpcRequest(TcpDeviceWareSessionContext ctx, MqttPublishMessage mqttMsg, String topicBase) throws AdaptorException;
59 59
... ...
... ... @@ -33,9 +33,9 @@ public class TcpUpEntry implements Serializable {
33 33 */
34 34 private String ackMsg;
35 35 /**
36   - * 设备名称、设备标识
  36 + * 数据唯一标识
37 37 */
38   - private String deviceName;
  38 + private String identifier;
39 39 /**
40 40 * 指标采集时间
41 41 */
... ...
... ... @@ -20,6 +20,9 @@ import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor;
20 20 import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry;
21 21
22 22 import java.util.UUID;
  23 +import java.util.concurrent.ConcurrentHashMap;
  24 +import java.util.concurrent.ConcurrentMap;
  25 +import java.util.concurrent.TimeUnit;
23 26 import java.util.function.Consumer;
24 27
25 28 /**
... ... @@ -36,7 +39,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
36 39 private volatile UUID telemetryScriptId;
37 40 @Getter
38 41 private volatile UUID rpcScriptId;
39   -
  42 + private final ConcurrentMap<String, Integer> rpcAwaitingAck;
40 43 /**
41 44 * 设备唯一标识符,例如:设备SN、设备地址码等。数据内携带标识符
42 45 */
... ... @@ -48,6 +51,7 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
48 51 super(sessionId);
49 52 this.context = context;
50 53 this.adaptor = context.getJsonTcpAdaptor();
  54 + this.rpcAwaitingAck = new ConcurrentHashMap<>();
51 55 }
52 56
53 57
... ... @@ -138,4 +142,21 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont
138 142 }, MoreExecutors.directExecutor());
139 143 }
140 144
  145 + /**
  146 + * 管理RPC请求信息
  147 + * @param rpcRequest 请求数据
  148 + * @param identifier 请求唯一标识
  149 + */
  150 + public void rpcRequesting(String identifier,TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
  151 + boolean oneWay = rpcRequest.getOneway();
  152 + if (!oneWay) {
  153 + rpcAwaitingAck.put(identifier, rpcRequest.getRequestId());
  154 + context.getScheduler().schedule(() -> {
  155 + rpcAwaitingAck.remove(identifier);
  156 + }, Math.max(0, Math.min(getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
  157 + }
  158 + }
  159 + public Integer getRpcRequesting(String identifier){
  160 + return rpcAwaitingAck.remove(identifier);
  161 + }
141 162 }
... ...
... ... @@ -30,6 +30,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
30 30 import org.thingsboard.server.transport.tcp.TcpTransportContext;
31 31
32 32 import java.util.UUID;
  33 +import java.util.concurrent.TimeUnit;
33 34
34 35 /**
35 36 * Created by ashvayka on 19.01.17.
... ... @@ -93,15 +94,12 @@ public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext impl
93 94 try {
94 95 parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
95 96 payload -> {
96   - boolean oneWay = request.getOneway();
  97 + rpcRequesting(payload.getIdentifier(),request);
97 98 ChannelFuture channelFuture = parent.pushDeviceMsg(payload.getDatas());
98 99 if (request.getPersisted()) {
99 100 channelFuture.addListener(result -> {
100   - if (oneWay) {
  101 + if (result.cause() == null) {
101 102 transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
102   - } else if (request.getPersisted()) {
103   - transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
104   -
105 103 }
106 104 });
107 105 }
... ...