Commit f932a90b00b7f8fffc61285b389dc8d45ac68449

Authored by Andrii Shvaika
1 parent 0fbb347b

Improvements to Persistent RPC call delivery confirmation

... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.actors.device;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
18 19 import com.fasterxml.jackson.databind.node.ObjectNode;
19 20 import com.google.common.util.concurrent.FutureCallback;
20 21 import com.google.common.util.concurrent.Futures;
... ... @@ -35,6 +36,7 @@ import org.thingsboard.server.actors.TbActorCtx;
35 36 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
36 37 import org.thingsboard.server.common.data.DataConstants;
37 38 import org.thingsboard.server.common.data.Device;
  39 +import org.thingsboard.server.common.data.StringUtils;
38 40 import org.thingsboard.server.common.data.edge.EdgeEvent;
39 41 import org.thingsboard.server.common.data.edge.EdgeEventActionType;
40 42 import org.thingsboard.server.common.data.edge.EdgeEventType;
... ... @@ -512,11 +514,20 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
512 514 ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
513 515 boolean success = requestMd != null;
514 516 if (success) {
515   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
516   - responseMsg.getPayload(), null));
  517 + boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
  518 + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
  519 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
  520 + new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  521 + payload, hasError ? RpcError.INTERNAL : null));
517 522 if (requestMd.getMsg().getMsg().isPersisted()) {
518   - RpcStatus status = responseMsg.getFailed() ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
519   - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, JacksonUtil.toJsonNode(responseMsg.getPayload()));
  523 + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  524 + JsonNode response;
  525 + try {
  526 + response = JacksonUtil.toJsonNode(payload);
  527 + } catch (IllegalArgumentException e) {
  528 + response = JacksonUtil.newObjectNode().put("error", payload);
  529 + }
  530 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
520 531 }
521 532 } else {
522 533 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
... ...
... ... @@ -339,7 +339,7 @@ message ToDeviceRpcRequestMsg {
339 339 message ToDeviceRpcResponseMsg {
340 340 int32 requestId = 1;
341 341 string payload = 2;
342   - bool failed = 3;
  342 + string error = 3;
343 343 }
344 344
345 345 message UplinkNotificationMsg {
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.DataConstants;
28 28 import org.thingsboard.server.common.data.Device;
29 29 import org.thingsboard.server.common.data.DeviceProfile;
30 30 import org.thingsboard.server.common.data.DeviceTransportType;
  31 +import org.thingsboard.server.common.data.StringUtils;
31 32 import org.thingsboard.server.common.data.device.data.PowerMode;
32 33 import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
33 34 import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
... ... @@ -507,6 +508,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
507 508 return;
508 509 }
509 510 boolean sent = false;
  511 + String error = null;
510 512 boolean conRequest = AbstractSyncSessionCallback.isConRequest(state.getRpc());
511 513 try {
512 514 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
... ... @@ -515,15 +517,12 @@ public class DefaultCoapClientContext implements CoapClientContext {
515 517 if (msg.getPersisted() && conRequest) {
516 518 transportContext.getRpcAwaitingAck().put(requestId, msg);
517 519 transportContext.getScheduler().schedule(() -> {
518   - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = transportContext.getRpcAwaitingAck().remove(requestId);
519   - if (awaitingAckMsg != null) {
520   - transportService.process(state.getSession(), msg, true, TransportServiceCallback.EMPTY);
521   - }
  520 + transportContext.getRpcAwaitingAck().remove(requestId);
522 521 }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
523 522 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
524 523 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
525 524 if (rpcRequestMsg != null) {
526   - transportService.process(state.getSession(), rpcRequestMsg, false, TransportServiceCallback.EMPTY);
  525 + transportService.process(state.getSession(), rpcRequestMsg, TransportServiceCallback.EMPTY);
527 526 }
528 527 }, null));
529 528 }
... ... @@ -536,9 +535,16 @@ public class DefaultCoapClientContext implements CoapClientContext {
536 535 log.trace("Failed to reply due to error", e);
537 536 cancelObserveRelation(state.getRpc());
538 537 cancelRpcSubscription(state);
  538 + error = "Failed to convert device RPC command to CoAP msg";
  539 + } catch (Exception e) {
  540 + error = "Internal error: " + e.getMessage();
539 541 } finally {
540   - if (msg.getPersisted() && !conRequest) {
541   - transportService.process(state.getSession(), msg, sent, TransportServiceCallback.EMPTY);
  542 + if (StringUtils.isNotEmpty(error)) {
  543 + transportService.process(state.getSession(),
  544 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  545 + .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
  546 + } else if (msg.getPersisted() && !conRequest && sent) {
  547 + transportService.process(state.getSession(), msg, TransportServiceCallback.EMPTY);
542 548 }
543 549 }
544 550 }
... ...
... ... @@ -409,7 +409,7 @@ public class DeviceApiController implements TbTransportService {
409 409 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
410 410 log.trace("[{}] Received RPC command to device", sessionId);
411 411 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
412   - transportService.process(sessionInfo, msg, false, TransportServiceCallback.EMPTY);
  412 + transportService.process(sessionInfo, msg, TransportServiceCallback.EMPTY);
413 413 }
414 414
415 415 @Override
... ...
... ... @@ -89,12 +89,12 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
89 89 if (!this.rpcSubscriptions.containsKey(requestUUID)) {
90 90 LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName());
91 91 if (operationType == null) {
92   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED.getName(), "Unsupported operation type: " + rpcRequest.getMethodName());
  92 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName());
93 93 return;
94 94 }
95 95 LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo);
96 96 if (client.getRegistration() == null) {
97   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR.getName(), "Registration is empty");
  97 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
98 98 return;
99 99 }
100 100 try {
... ... @@ -145,7 +145,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
145 145 }
146 146 } else {
147 147 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
148   - ResponseCode.INTERNAL_SERVER_ERROR.getName(), "This device does not support Composite Operation");
  148 + ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation");
149 149 }
150 150 } else {
151 151 switch (operationType) {
... ... @@ -165,7 +165,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
165 165 }
166 166 }
167 167 } catch (IllegalArgumentException e) {
168   - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST.getName(), e.getMessage());
  168 + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
169 169 }
170 170 }
171 171 }
... ... @@ -312,9 +312,9 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
312 312 }
313 313 }
314 314
315   - private void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, String result, String error) {
316   - String payload = JacksonUtil.toString(JacksonUtil.newObjectNode().put("result", result).put("error", error));
317   - TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(payload).setFailed(true).build();
  315 + private void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String error) {
  316 + String payload = JacksonUtil.toString(LwM2MRpcResponseBody.builder().result(result.getName()).error(error).build());
  317 + TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build();
318 318 transportService.process(sessionInfo, msg, null);
319 319 }
320 320
... ...
... ... @@ -27,6 +27,5 @@ public class LwM2MRpcResponseBody {
27 27 private String result;
28 28 private String value;
29 29 private String error;
30   - private String info;
31 30
32 31 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.lwm2m.server.rpc;
18 18 import org.eclipse.leshan.core.ResponseCode;
19 19 import org.eclipse.leshan.core.request.exception.ClientSleepingException;
20 20 import org.thingsboard.common.util.JacksonUtil;
  21 +import org.thingsboard.server.common.data.StringUtils;
21 22 import org.thingsboard.server.common.transport.TransportService;
22 23 import org.thingsboard.server.common.transport.TransportServiceCallback;
23 24 import org.thingsboard.server.gen.transport.TransportProtos;
... ... @@ -43,7 +44,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
43 44
44 45 @Override
45 46 public void onSuccess(R request, T response) {
46   - transportService.process(client.getSession(), this.request, false, TransportServiceCallback.EMPTY);
  47 + transportService.process(client.getSession(), this.request, TransportServiceCallback.EMPTY);
47 48 sendRpcReplyOnSuccess(response);
48 49 if (callback != null) {
49 50 callback.onSuccess(request, response);
... ... @@ -69,26 +70,24 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
69 70 }
70 71
71 72 protected void reply(LwM2MRpcResponseBody response) {
72   - reply(response, false);
73   - }
74   -
75   - protected void reply(LwM2MRpcResponseBody response, boolean failed) {
76   - TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
77   - .setPayload(JacksonUtil.toString(response))
78   - .setRequestId(request.getRequestId())
79   - .setFailed(failed)
80   - .build();
81   - transportService.process(client.getSession(), msg, null);
  73 + TransportProtos.ToDeviceRpcResponseMsg.Builder msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(request.getRequestId());
  74 + String responseAsString = JacksonUtil.toString(response);
  75 + if (StringUtils.isEmpty(response.getError())) {
  76 + msg.setPayload(responseAsString);
  77 + } else {
  78 + msg.setError(responseAsString);
  79 + }
  80 + transportService.process(client.getSession(), msg.build(), null);
82 81 }
83 82
84 83 abstract protected void sendRpcReplyOnSuccess(T response);
85 84
86 85 protected void sendRpcReplyOnValidationError(String msg) {
87   - reply(LwM2MRpcResponseBody.builder().result(ResponseCode.BAD_REQUEST.getName()).error(msg).build(), true);
  86 + reply(LwM2MRpcResponseBody.builder().result(ResponseCode.BAD_REQUEST.getName()).error(msg).build());
88 87 }
89 88
90 89 protected void sendRpcReplyOnError(Exception e) {
91   - reply(LwM2MRpcResponseBody.builder().result(ResponseCode.INTERNAL_SERVER_ERROR.getName()).error(e.getMessage()).build(), true);
  90 + reply(LwM2MRpcResponseBody.builder().result(ResponseCode.INTERNAL_SERVER_ERROR.getName()).error(e.getMessage()).build());
92 91 }
93 92
94 93 }
... ...
... ... @@ -249,7 +249,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
249 249 int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
250 250 TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
251 251 if (rpcRequest != null) {
252   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, false, TransportServiceCallback.EMPTY);
  252 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
253 253 }
254 254 break;
255 255 default:
... ... @@ -829,18 +829,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
829 829 if (rpcRequest.getPersisted() && isAckExpected(payload)) {
830 830 rpcAwaitingAck.put(msgId, rpcRequest);
831 831 context.getScheduler().schedule(() -> {
832   - TransportProtos.ToDeviceRpcRequestMsg awaitingAckMsg = rpcAwaitingAck.remove(msgId);
833   - if (awaitingAckMsg != null) {
834   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, true, TransportServiceCallback.EMPTY);
835   - }
  832 + rpcAwaitingAck.remove(msgId);
836 833 }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
837 834 }
838 835 var cf = publish(payload, deviceSessionCtx);
839 836 if (rpcRequest.getPersisted() && !isAckExpected(payload)) {
840   - cf.addListener(result -> transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, result.cause() != null, TransportServiceCallback.EMPTY));
  837 + cf.addListener(result -> {
  838 + if (result.cause() == null) {
  839 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
  840 + }
  841 + });
841 842 }
842 843 });
843 844 } catch (Exception e) {
  845 + transportService.process(deviceSessionCtx.getSessionInfo(),
  846 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  847 + .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
844 848 log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
845 849 }
846 850 }
... ...
... ... @@ -101,13 +101,18 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
101 101 payload -> {
102 102 ChannelFuture channelFuture = parent.writeAndFlush(payload);
103 103 if (request.getPersisted()) {
104   - channelFuture.addListener(future ->
105   - transportService.process(getSessionInfo(), request, future.cause() != null, TransportServiceCallback.EMPTY)
106   - );
  104 + channelFuture.addListener(future -> {
  105 + if (future.cause() == null) {
  106 + transportService.process(getSessionInfo(), request, TransportServiceCallback.EMPTY);
  107 + }
  108 + });
107 109 }
108 110 }
109 111 );
110 112 } catch (Exception e) {
  113 + transportService.process(getSessionInfo(),
  114 + TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
  115 + .setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
111 116 log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
112 117 }
113 118 }
... ...
... ... @@ -142,7 +142,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
142 142 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
143 143 log.trace("[{}] Received RPC command to device", sessionId);
144 144 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
145   - snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, false, TransportServiceCallback.EMPTY);
  145 + snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, TransportServiceCallback.EMPTY);
146 146 }
147 147
148 148 @Override
... ...
... ... @@ -110,7 +110,7 @@ public interface TransportService {
110 110
111 111 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
112 112
113   - void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, boolean isFailedRpc, TransportServiceCallback<Void> callback);
  113 + void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback);
114 114
115 115 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
116 116
... ...
... ... @@ -580,17 +580,9 @@ public class DefaultTransportService implements TransportService {
580 580 }
581 581
582 582 @Override
583   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, boolean isFailedRpc, TransportServiceCallback<Void> callback) {
  583 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
584 584 if (msg.getPersisted()) {
585   - RpcStatus status;
586   -
587   - if (isFailedRpc) {
588   - status = RpcStatus.FAILED;
589   - } else if (msg.getOneway()) {
590   - status = RpcStatus.SUCCESSFUL;
591   - } else {
592   - status = RpcStatus.DELIVERED;
593   - }
  585 + RpcStatus status = msg.getOneway() ? RpcStatus.SUCCESSFUL : RpcStatus.DELIVERED;
594 586
595 587 TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
596 588 .setRequestId(msg.getRequestId())
... ...