Commit 5d6ec0dd0e67b1de1e4793ef706a7d753ef51bdd

Authored by YevhenBondarenko
1 parent 8513c999

refactoring

@@ -387,7 +387,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -387,7 +387,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
387 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); 387 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
388 } 388 }
389 if (msg.hasRpcResponseStatusMsg()) { 389 if (msg.hasRpcResponseStatusMsg()) {
390 - processPersistedRpcResponses(context, sessionInfo, msg.getRpcResponseStatusMsg()); 390 + processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
391 } 391 }
392 if (msg.hasUplinkNotificationMsg()) { 392 if (msg.hasUplinkNotificationMsg()) {
393 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); 393 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
@@ -572,7 +572,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -572,7 +572,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
572 } 572 }
573 } 573 }
574 574
575 - private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) { 575 + private void processRpcResponseStatus(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
576 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); 576 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
577 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); 577 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
578 ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId()); 578 ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
@@ -106,10 +106,11 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -106,10 +106,11 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
106 if (request.getPersisted()) { 106 if (request.getPersisted()) {
107 channelFuture.addListener(result -> { 107 channelFuture.addListener(result -> {
108 if (result.cause() == null) { 108 if (result.cause() == null) {
109 - if (isAckExpected(payload)) {  
110 - transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);  
111 - } else { 109 + if (!isAckExpected(payload)) {
112 transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); 110 transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  111 + } else if (request.getPersisted()) {
  112 + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  113 +
113 } 114 }
114 } 115 }
115 }); 116 });