Commit ba2112ae88540ae522541c3c51331aba74328799

Authored by 芯火源
1 parent 1d92e27a

fix: TCP命令下发为每桶

... ... @@ -500,17 +500,25 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
500 500 log.debug("【{}】下发RPC命令【{}】给设备【{}】", sessionId, rpcRequest.getParams(), deviceSessionCtx.getDeviceInfo().getDeviceName());
501 501 TcpTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
502 502 try {
503   - adaptor.convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
504   - deviceSessionCtx.rpcRequesting(payload.getIdentifier(),rpcRequest);
  503 + adaptor
  504 + .convertToPublish(deviceSessionCtx, rpcRequest)
  505 + .ifPresent(
  506 + payload -> {
  507 + deviceSessionCtx.rpcRequesting(payload.getIdentifier(), rpcRequest);
505 508 var cf = pushDeviceMsg(deviceSessionCtx.getChannel(), payload.getDatas());
506   - cf.addListener(result -> {
507   - if (result.cause() == null) {
508   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
509   - } else {
  509 + cf.addListener(
  510 + result -> {
  511 + if (result.cause() == null) {
  512 + transportService.process(
  513 + deviceSessionCtx.getSessionInfo(),
  514 + rpcRequest,
  515 + RpcStatus.DELIVERED,
  516 + TransportServiceCallback.EMPTY);
  517 + } else {
510 518 // TODO: send error
511   - }
512   - });
513   - });
  519 + }
  520 + });
  521 + });
514 522 } catch (Exception e) {
515 523 transportService.process(deviceSessionCtx.getSessionInfo(),
516 524 TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
... ... @@ -532,10 +540,13 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements
532 540 * @return
533 541 */
534 542 private ChannelFuture pushDeviceMsg(ChannelHandlerContext ctx, String message) {
535   - byte[] payloadInBytes = ByteUtils.strToBytes(StringUtils.isEmpty(message)?"":message);
536   - ByteBuf payload = Unpooled.copiedBuffer(payloadInBytes);
537   -
538   - return ctx.writeAndFlush(payload);
  543 + ByteBuf buff = Unpooled.buffer();
  544 + if(!message.matches("-?[0-9a-fA-F]+")){
  545 + //不满足16进制将字符串转为16进制
  546 + message = ByteUtils.stringEncodeToHex(message);
  547 + }
  548 + buff.writeBytes(ByteUtils.hexToByteArray(message));
  549 + return ctx.writeAndFlush(buff);
539 550 }
540 551
541 552
... ...
... ... @@ -140,7 +140,7 @@ public class JsonTcpAdaptor implements TcpTransportAdaptor {
140 140 // return Optional.of(Futures.transform(result, t -> JacksonUtil.fromString(t.toString(), TcpDownEntry.class), MoreExecutors.directExecutor()).get());
141 141 String payload = rpcRequest.getParams();//methodThingskit
142 142 if (!payload.startsWith("{") && !payload.endsWith("}")) {
143   - payload = payload.replace("\"","");;
  143 + payload = payload.replace("\"","").replace(" ","");
144 144 }
145 145 TcpDownEntry data = new TcpDownEntry();
146 146 data.setDatas(payload);
... ...