Commit f7aa8cc7cee53ec7c3330daeb52c7a808ed70544

Authored by Andrew Shvayka
Committed by GitHub
2 parents 615f5173 24aa6f30

Merge pull request #5522 from YevhenBondarenko/master

[3.3.2] Lwm2M Improvements to avoid duplication of RPC call
@@ -117,6 +117,10 @@ public class LwM2mClient implements Serializable { @@ -117,6 +117,10 @@ public class LwM2mClient implements Serializable {
117 @Getter 117 @Getter
118 private final AtomicInteger retryAttempts; 118 private final AtomicInteger retryAttempts;
119 119
  120 + @Getter
  121 + @Setter
  122 + private UUID lastSentRpcId;
  123 +
120 public Object clone() throws CloneNotSupportedException { 124 public Object clone() throws CloneNotSupportedException {
121 return super.clone(); 125 return super.clone();
122 } 126 }
@@ -319,6 +319,10 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @@ -319,6 +319,10 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
319 Registration registration = client.getRegistration(); 319 Registration registration = client.getRegistration();
320 try { 320 try {
321 logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), pathToStringFunction.apply(request))); 321 logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), pathToStringFunction.apply(request)));
  322 + if (!callback.onSent(request)) {
  323 + return;
  324 + }
  325 +
322 context.getServer().send(registration, request, timeoutInMs, response -> { 326 context.getServer().send(registration, request, timeoutInMs, response -> {
323 executor.submit(() -> { 327 executor.submit(() -> {
324 try { 328 try {
@@ -330,7 +334,6 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @@ -330,7 +334,6 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
330 } 334 }
331 }); 335 });
332 }, e -> handleDownlinkError(client, request, callback, e)); 336 }, e -> handleDownlinkError(client, request, callback, e));
333 - callback.onSent(request);  
334 } catch (Exception e) { 337 } catch (Exception e) {
335 handleDownlinkError(client, request, callback, e); 338 handleDownlinkError(client, request, callback, e);
336 } 339 }
@@ -366,6 +369,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @@ -366,6 +369,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
366 369
367 private <R extends DownlinkRequest<T>, T extends LwM2mResponse> void handleDownlinkError(LwM2mClient client, R request, DownlinkRequestCallback<R, T> callback, Exception e) { 370 private <R extends DownlinkRequest<T>, T extends LwM2mResponse> void handleDownlinkError(LwM2mClient client, R request, DownlinkRequestCallback<R, T> callback, Exception e) {
368 log.trace("[{}] Received downlink error: {}.", client.getEndpoint(), e); 371 log.trace("[{}] Received downlink error: {}.", client.getEndpoint(), e);
  372 + client.updateLastUplinkTime();
369 executor.submit(() -> { 373 executor.submit(() -> {
370 if (e instanceof TimeoutException || e instanceof ClientSleepingException) { 374 if (e instanceof TimeoutException || e instanceof ClientSleepingException) {
371 log.trace("[{}] Received {}, client is probably sleeping", client.getEndpoint(), e.getClass().getSimpleName()); 375 log.trace("[{}] Received {}, client is probably sleeping", client.getEndpoint(), e.getClass().getSimpleName());
@@ -17,7 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; @@ -17,7 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18 public interface DownlinkRequestCallback<R, T> { 18 public interface DownlinkRequestCallback<R, T> {
19 19
20 - default void onSent(R request){}; 20 + default boolean onSent(R request){
  21 + return true;
  22 + };
21 23
22 void onSuccess(R request, T response); 24 void onSuccess(R request, T response);
23 25
@@ -95,6 +95,12 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { @@ -95,6 +95,12 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
95 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); 95 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
96 return; 96 return;
97 } 97 }
  98 + UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());
  99 +
  100 + if (rpcId.equals(client.getLastSentRpcId())) {
  101 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  102 + return;
  103 + }
98 try { 104 try {
99 if (operationType.isHasObjectId()) { 105 if (operationType.isHasObjectId()) {
100 String objectId = getIdFromParameters(client, rpcRequest); 106 String objectId = getIdFromParameters(client, rpcRequest);
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.transport.lwm2m.server.rpc; 16 package org.thingsboard.server.transport.lwm2m.server.rpc;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 import org.eclipse.leshan.core.ResponseCode; 19 import org.eclipse.leshan.core.ResponseCode;
19 import org.eclipse.leshan.core.request.exception.ClientSleepingException; 20 import org.eclipse.leshan.core.request.exception.ClientSleepingException;
20 import org.thingsboard.common.util.JacksonUtil; 21 import org.thingsboard.common.util.JacksonUtil;
@@ -26,8 +27,10 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -26,8 +27,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
26 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; 27 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
27 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; 28 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
28 29
  30 +import java.util.UUID;
29 import java.util.concurrent.TimeoutException; 31 import java.util.concurrent.TimeoutException;
30 32
  33 +@Slf4j
31 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> { 34 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> {
32 35
33 private final TransportService transportService; 36 private final TransportService transportService;
@@ -44,8 +47,20 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -44,8 +47,20 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
44 } 47 }
45 48
46 @Override 49 @Override
47 - public void onSent(R request) { 50 + public boolean onSent(R request) {
  51 + client.lock();
  52 + try {
  53 + UUID rpcId = new UUID(this.request.getRequestIdMSB(), this.request.getRequestIdLSB());
  54 + if (rpcId.equals(client.getLastSentRpcId())) {
  55 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  56 + return false;
  57 + }
  58 + client.setLastSentRpcId(rpcId);
  59 + } finally {
  60 + client.unlock();
  61 + }
48 transportService.process(client.getSession(), this.request, RpcStatus.SENT, TransportServiceCallback.EMPTY); 62 transportService.process(client.getSession(), this.request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  63 + return true;
49 } 64 }
50 65
51 @Override 66 @Override
@@ -68,6 +83,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -68,6 +83,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
68 @Override 83 @Override
69 public void onError(String params, Exception e) { 84 public void onError(String params, Exception e) {
70 if (e instanceof TimeoutException || e instanceof org.eclipse.leshan.core.request.exception.TimeoutException) { 85 if (e instanceof TimeoutException || e instanceof org.eclipse.leshan.core.request.exception.TimeoutException) {
  86 + client.setLastSentRpcId(null);
71 transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); 87 transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
72 } else if (!(e instanceof ClientSleepingException)) { 88 } else if (!(e instanceof ClientSleepingException)) {
73 sendRpcReplyOnError(e); 89 sendRpcReplyOnError(e);