Commit 1757e9b8266d2dc97c6d8e1b16a783f51be378f0

Authored by xp.Huang
2 parents 592d4417 5b4974eb

Merge branch '20230420' into 'master_dev'

fix: 设备鉴权前校验凭证有效性

See merge request yunteng/thingskit!179
... ... @@ -71,7 +71,9 @@ import java.util.Optional;
71 71 import java.util.UUID;
72 72 import java.util.concurrent.ConcurrentHashMap;
73 73 import java.util.concurrent.ConcurrentMap;
  74 +import java.util.concurrent.TimeUnit;
74 75 import java.util.concurrent.atomic.AtomicInteger;
  76 +import java.util.regex.Pattern;
75 77
76 78 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
77 79 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
... ... @@ -219,27 +221,22 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
219 221 deviceSessionCtx.doUpScript(tcpMessage, r -> {
220 222 if (gatewaySessionHandler != null) {
221 223 processGatewayDeviceMsg(ctx, r);
222   -
223   - if (!hasDatas(r.getDatas(), true)) {
224   - transportService.reportActivity(deviceSessionCtx.getSessionInfo());
225   - return;
226   - }
227 224 }
228   -
229   -
230 225 processDirectDeviceMsg(ctx, r);
231 226 });
232 227
233 228 }
234 229
235 230
236   - private boolean hasDatas(Map<String, Object> datas, boolean includeSource) {
  231 + /**
  232 + * 上行脚本解析结果是否包含数据
  233 + * @param datas 数据集合
  234 + * @return
  235 + */
  236 + private boolean hasDatas(Map<String, Object> datas) {
237 237 if (datas == null || datas.isEmpty()) {
238 238 return false;
239 239 }
240   - if (includeSource && !datas.containsKey(TkScriptFactory.ORIGINAL_DATA_FILED)) {
241   - return false;
242   - }
243 240 return true;
244 241 }
245 242
... ... @@ -248,7 +245,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
248 245 log.trace("[{}][{}] Processing publish msg [{}]!", sessionId, deviceSessionCtx.getDeviceId(), tcpMessage);
249 246 try {
250 247 Map<String, Object> datas = tcpMessage.getDatas();
251   - if (hasDatas(datas, false)) {
  248 + if (hasDatas(datas)) {
252 249 datas.forEach((devName, param) -> {
253 250 if (TkScriptFactory.ORIGINAL_DATA_FILED.equals(devName)) {
254 251 return;
... ... @@ -275,7 +272,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
275 272 try {
276 273 TcpTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
277 274 Map<String, Object> datas = tcpMessage.getDatas();
278   - if (hasDatas(datas, false)) {
  275 + if (hasDatas(datas)) {
279 276 String dataStr = JacksonUtil.toString(datas);
280 277 if (tcpMessage.getTelemetry()) {
281 278 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, dataStr);
... ... @@ -347,11 +344,12 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
347 344 }
348 345 if (null != accessToken.getUserName()) {
349 346 }
350   - if (null == accessToken.getPassword()) {
  347 + String token = accessToken.getPassword();
  348 + if (null == token || Pattern.compile(".*\\s+.*").matcher(token).matches()) {
351 349 onValidateFailed(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID);
352 350 return;
353 351 }
354   - request.setToken(accessToken.getPassword());
  352 + request.setToken(token);
355 353
356 354 transportService.process(DeviceTransportType.TCP, request.build(),
357 355 new TransportServiceCallback<>() {
... ... @@ -504,26 +502,12 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
504 502
505 503 @Override
506 504 public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
507   -// log.trace("[{}] Received get attributes response", sessionId);
508   -// String topicBase = attrReqTopicType.getAttributesResponseTopicBase();
509   -// TcpTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrReqTopicType);
510   -// try {
511   -// adaptor.convertToPublish(deviceSessionCtx, response, topicBase).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
512   -// } catch (Exception e) {
513   -// log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
514   -// }
  505 +
515 506 }
516 507
517 508 @Override
518 509 public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
519   -// log.trace("[{}] Received attributes update notification to device", sessionId);
520   -// String topic = attrSubTopicType.getAttributesSubTopic();
521   -// TcpTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType);
522   -// try {
523   -// adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
524   -// } catch (Exception e) {
525   -// log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
526   -// }
  510 +
527 511 }
528 512
529 513 @Override
... ... @@ -538,23 +522,23 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
538 522 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
539 523 try {
540 524 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
541   -// int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
542   -// if (isAckExpected(payload)) {
543   -// rpcAwaitingAck.put(msgId, rpcRequest);
544   -// context.getScheduler().schedule(() -> {
545   -// TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
546   -// if (msg != null) {
547   -// transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
548   -// }
549   -// }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
550   -// }
  525 + int msgId = rpcRequest.getRequestId();
  526 + boolean oneWay = rpcRequest.getOneway();
  527 + if (!oneWay) {
  528 + rpcAwaitingAck.put(msgId, rpcRequest);
  529 + context.getScheduler().schedule(() -> {
  530 + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
  531 + if (msg != null) {
  532 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  533 + }
  534 + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
  535 + }
551 536 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas());
552 537 cf.addListener(result -> {
553 538 if (result.cause() == null) {
554   -// if (!isAckExpected(payload)) {
555   -// transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
556   -// } else
557   - if (rpcRequest.getPersisted()) {
  539 + if (oneWay) {
  540 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  541 + } else if (rpcRequest.getPersisted()) {
558 542 transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
559 543 }
560 544 } else {
... ... @@ -573,13 +557,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
573 557 @Override
574 558 public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
575 559 log.debug("[{}] 服务端响应设备的RPC请求", sessionId);
576   -// String baseTopic = toServerRpcSubTopicType.getRpcResponseTopicBase();
577   -// TcpTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(toServerRpcSubTopicType);
578   -// try {
579   -// adaptor.convertToPublish(deviceSessionCtx, rpcResponse, baseTopic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
580   -// } catch (Exception e) {
581   -// log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
582   -// }
583 560 }
584 561
585 562 /**
... ...
... ... @@ -93,11 +93,15 @@ public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext impl
93 93 try {
94 94 parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
95 95 payload -> {
  96 + boolean oneWay = request.getOneway();
96 97 ChannelFuture channelFuture = parent.pushDeviceMsg(payload.getDatas());
97 98 if (request.getPersisted()) {
98 99 channelFuture.addListener(result -> {
99   - if (result.cause() == null) {
  100 + if (oneWay) {
  101 + transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  102 + } else if (request.getPersisted()) {
100 103 transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  104 +
101 105 }
102 106 });
103 107 }
... ...