Commit 0fbb347b27a4f70e324beac90f79313697abd81b

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 9720dfac

persistent rpc improvements

@@ -515,7 +515,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -515,7 +515,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
515 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 515 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
516 responseMsg.getPayload(), null)); 516 responseMsg.getPayload(), null));
517 if (requestMd.getMsg().getMsg().isPersisted()) { 517 if (requestMd.getMsg().getMsg().isPersisted()) {
518 - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.SUCCESSFUL, JacksonUtil.toJsonNode(responseMsg.getPayload())); 518 + RpcStatus status = responseMsg.getFailed() ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  519 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, JacksonUtil.toJsonNode(responseMsg.getPayload()));
519 } 520 }
520 } else { 521 } else {
521 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 522 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
@@ -339,6 +339,7 @@ message ToDeviceRpcRequestMsg { @@ -339,6 +339,7 @@ message ToDeviceRpcRequestMsg {
339 message ToDeviceRpcResponseMsg { 339 message ToDeviceRpcResponseMsg {
340 int32 requestId = 1; 340 int32 requestId = 1;
341 string payload = 2; 341 string payload = 2;
  342 + bool failed = 3;
342 } 343 }
343 344
344 message UplinkNotificationMsg { 345 message UplinkNotificationMsg {
@@ -84,7 +84,6 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s @@ -84,7 +84,6 @@ public class LwM2mSessionMsgListener implements GenericFutureListener<Future<? s
84 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) { 84 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
85 log.trace("[{}] Received RPC command to device", sessionId); 85 log.trace("[{}] Received RPC command to device", sessionId);
86 this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo); 86 this.rpcHandler.onToDeviceRpcRequest(toDeviceRequest, this.sessionInfo);
87 - transportService.process(sessionInfo, toDeviceRequest, false, TransportServiceCallback.EMPTY);  
88 } 87 }
89 88
90 @Override 89 @Override
@@ -314,7 +314,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { @@ -314,7 +314,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
314 314
315 private void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, String result, String error) { 315 private void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, String result, String error) {
316 String payload = JacksonUtil.toString(JacksonUtil.newObjectNode().put("result", result).put("error", error)); 316 String payload = JacksonUtil.toString(JacksonUtil.newObjectNode().put("result", result).put("error", error));
317 - TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).build(); 317 + TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).setFailed(true).build();
318 transportService.process(sessionInfo, msg, null); 318 transportService.process(sessionInfo, msg, null);
319 } 319 }
320 320
@@ -16,12 +16,16 @@ @@ -16,12 +16,16 @@
16 package org.thingsboard.server.transport.lwm2m.server.rpc; 16 package org.thingsboard.server.transport.lwm2m.server.rpc;
17 17
18 import org.eclipse.leshan.core.ResponseCode; 18 import org.eclipse.leshan.core.ResponseCode;
  19 +import org.eclipse.leshan.core.request.exception.ClientSleepingException;
19 import org.thingsboard.common.util.JacksonUtil; 20 import org.thingsboard.common.util.JacksonUtil;
20 import org.thingsboard.server.common.transport.TransportService; 21 import org.thingsboard.server.common.transport.TransportService;
  22 +import org.thingsboard.server.common.transport.TransportServiceCallback;
21 import org.thingsboard.server.gen.transport.TransportProtos; 23 import org.thingsboard.server.gen.transport.TransportProtos;
22 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; 24 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
23 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; 25 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
24 26
  27 +import java.util.concurrent.TimeoutException;
  28 +
25 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> { 29 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> {
26 30
27 private final TransportService transportService; 31 private final TransportService transportService;
@@ -39,6 +43,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -39,6 +43,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
39 43
40 @Override 44 @Override
41 public void onSuccess(R request, T response) { 45 public void onSuccess(R request, T response) {
  46 + transportService.process(client.getSession(), this.request, false, TransportServiceCallback.EMPTY);
42 sendRpcReplyOnSuccess(response); 47 sendRpcReplyOnSuccess(response);
43 if (callback != null) { 48 if (callback != null) {
44 callback.onSuccess(request, response); 49 callback.onSuccess(request, response);
@@ -55,16 +60,23 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -55,16 +60,23 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
55 60
56 @Override 61 @Override
57 public void onError(String params, Exception e) { 62 public void onError(String params, Exception e) {
58 - sendRpcReplyOnError(e); 63 + if (!(e instanceof TimeoutException || e instanceof ClientSleepingException)) {
  64 + sendRpcReplyOnError(e);
  65 + }
59 if (callback != null) { 66 if (callback != null) {
60 callback.onError(params, e); 67 callback.onError(params, e);
61 } 68 }
62 } 69 }
63 70
64 protected void reply(LwM2MRpcResponseBody response) { 71 protected void reply(LwM2MRpcResponseBody response) {
  72 + reply(response, false);
  73 + }
  74 +
  75 + protected void reply(LwM2MRpcResponseBody response, boolean failed) {
65 TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder() 76 TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
66 .setPayload(JacksonUtil.toString(response)) 77 .setPayload(JacksonUtil.toString(response))
67 .setRequestId(request.getRequestId()) 78 .setRequestId(request.getRequestId())
  79 + .setFailed(failed)
68 .build(); 80 .build();
69 transportService.process(client.getSession(), msg, null); 81 transportService.process(client.getSession(), msg, null);
70 } 82 }
@@ -72,11 +84,11 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -72,11 +84,11 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
72 abstract protected void sendRpcReplyOnSuccess(T response); 84 abstract protected void sendRpcReplyOnSuccess(T response);
73 85
74 protected void sendRpcReplyOnValidationError(String msg) { 86 protected void sendRpcReplyOnValidationError(String msg) {
75 - reply(LwM2MRpcResponseBody.builder().result(ResponseCode.BAD_REQUEST.getName()).error(msg).build()); 87 + reply(LwM2MRpcResponseBody.builder().result(ResponseCode.BAD_REQUEST.getName()).error(msg).build(), true);
76 } 88 }
77 89
78 protected void sendRpcReplyOnError(Exception e) { 90 protected void sendRpcReplyOnError(Exception e) {
79 - reply(LwM2MRpcResponseBody.builder().result(ResponseCode.INTERNAL_SERVER_ERROR.getName()).error(e.getMessage()).build()); 91 + reply(LwM2MRpcResponseBody.builder().result(ResponseCode.INTERNAL_SERVER_ERROR.getName()).error(e.getMessage()).build(), true);
80 } 92 }
81 93
82 } 94 }