Commit 71a0f66795398ef1471d574a0baff7785325ff79

Authored by 芯火源
1 parent 08b4dec8

fix: TCP命令下发状态超时问题修复

1、同步时,命令下发后返回状态DELIVERED给演员节点
2、异步时,命令下发后返回状态SENT给演员节点
设备演员在收到的RPC状态为DELIVERED才会删除监控RPC状态的定时任务。
... ... @@ -71,6 +71,7 @@ import java.util.Optional;
71 71 import java.util.UUID;
72 72 import java.util.concurrent.ConcurrentHashMap;
73 73 import java.util.concurrent.ConcurrentMap;
  74 +import java.util.concurrent.TimeUnit;
74 75 import java.util.concurrent.atomic.AtomicInteger;
75 76 import java.util.regex.Pattern;
76 77
... ... @@ -526,11 +527,23 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
526 527 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
527 528 try {
528 529 adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
529   -
  530 + int msgId = rpcRequest.getRequestId();
  531 + boolean oneWay = rpcRequest.getOneway();
  532 + if (!oneWay) {
  533 + rpcAwaitingAck.put(msgId, rpcRequest);
  534 + context.getScheduler().schedule(() -> {
  535 + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
  536 + if (msg != null) {
  537 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  538 + }
  539 + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
  540 + }
530 541 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas());
531 542 cf.addListener(result -> {
532 543 if (result.cause() == null) {
533   - if (rpcRequest.getPersisted()) {
  544 + if (oneWay) {
  545 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  546 + } else if (rpcRequest.getPersisted()) {
534 547 transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
535 548 }
536 549 } else {
... ...
... ... @@ -93,11 +93,15 @@ public class TcpGatewayDeviceSessionCtx extends TcpDeviceWareSessionContext impl
93 93 try {
94 94 parent.getPayloadAdaptor().convertToPublish(this, request).ifPresent(
95 95 payload -> {
  96 + boolean oneWay = request.getOneway();
96 97 ChannelFuture channelFuture = parent.pushDeviceMsg(payload.getDatas());
97 98 if (request.getPersisted()) {
98 99 channelFuture.addListener(result -> {
99   - if (result.cause() == null) {
  100 + if (oneWay) {
  101 + transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  102 + } else if (request.getPersisted()) {
100 103 transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  104 +
101 105 }
102 106 });
103 107 }
... ...