Commit 183f3547a361c9cff0ef365cfd20da37ea5f3de3

Authored by YevhenBondarenko
1 parent b5da59e3

added lock for onSent rpc

@@ -119,7 +119,7 @@ public class LwM2mClient implements Serializable { @@ -119,7 +119,7 @@ public class LwM2mClient implements Serializable {
119 119
120 @Getter 120 @Getter
121 @Setter 121 @Setter
122 - private Integer lastSentRpcId; 122 + private UUID lastSentRpcId;
123 123
124 public Object clone() throws CloneNotSupportedException { 124 public Object clone() throws CloneNotSupportedException {
125 return super.clone(); 125 return super.clone();
@@ -319,6 +319,10 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @@ -319,6 +319,10 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
319 Registration registration = client.getRegistration(); 319 Registration registration = client.getRegistration();
320 try { 320 try {
321 logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), pathToStringFunction.apply(request))); 321 logService.log(client, String.format("[%s][%s] Sending request: %s to %s", registration.getId(), registration.getSocketAddress(), request.getClass().getSimpleName(), pathToStringFunction.apply(request)));
  322 + if (!callback.onSent(request)) {
  323 + return;
  324 + }
  325 +
322 context.getServer().send(registration, request, timeoutInMs, response -> { 326 context.getServer().send(registration, request, timeoutInMs, response -> {
323 executor.submit(() -> { 327 executor.submit(() -> {
324 try { 328 try {
@@ -330,7 +334,6 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im @@ -330,7 +334,6 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
330 } 334 }
331 }); 335 });
332 }, e -> handleDownlinkError(client, request, callback, e)); 336 }, e -> handleDownlinkError(client, request, callback, e));
333 - callback.onSent(request);  
334 } catch (Exception e) { 337 } catch (Exception e) {
335 handleDownlinkError(client, request, callback, e); 338 handleDownlinkError(client, request, callback, e);
336 } 339 }
@@ -17,7 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; @@ -17,7 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink;
17 17
18 public interface DownlinkRequestCallback<R, T> { 18 public interface DownlinkRequestCallback<R, T> {
19 19
20 - default void onSent(R request){}; 20 + default boolean onSent(R request){
  21 + return true;
  22 + };
21 23
22 void onSuccess(R request, T response); 24 void onSuccess(R request, T response);
23 25
@@ -95,8 +95,11 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { @@ -95,8 +95,11 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
95 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); 95 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty");
96 return; 96 return;
97 } 97 }
98 - if (client.getLastSentRpcId() != null && client.getLastSentRpcId().equals(rpcRequest.getRequestId())) {  
99 - log.info("[{}] Rpc has already sent!", rpcRequest.getRequestId()); 98 + UUID rpcId = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB());
  99 +
  100 + if (rpcId.equals(client.getLastSentRpcId())) {
  101 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  102 + return;
100 } 103 }
101 try { 104 try {
102 if (operationType.isHasObjectId()) { 105 if (operationType.isHasObjectId()) {
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.transport.lwm2m.server.rpc; 16 package org.thingsboard.server.transport.lwm2m.server.rpc;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
18 import org.eclipse.leshan.core.ResponseCode; 19 import org.eclipse.leshan.core.ResponseCode;
19 import org.eclipse.leshan.core.request.exception.ClientSleepingException; 20 import org.eclipse.leshan.core.request.exception.ClientSleepingException;
20 import org.thingsboard.common.util.JacksonUtil; 21 import org.thingsboard.common.util.JacksonUtil;
@@ -26,8 +27,10 @@ import org.thingsboard.server.gen.transport.TransportProtos; @@ -26,8 +27,10 @@ import org.thingsboard.server.gen.transport.TransportProtos;
26 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; 27 import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
27 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; 28 import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
28 29
  30 +import java.util.UUID;
29 import java.util.concurrent.TimeoutException; 31 import java.util.concurrent.TimeoutException;
30 32
  33 +@Slf4j
31 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> { 34 public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkRequestCallback<R, T> {
32 35
33 private final TransportService transportService; 36 private final TransportService transportService;
@@ -44,9 +47,20 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -44,9 +47,20 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
44 } 47 }
45 48
46 @Override 49 @Override
47 - public void onSent(R request) {  
48 - client.setLastSentRpcId(this.request.getRequestId()); 50 + public boolean onSent(R request) {
  51 + client.lock();
  52 + try {
  53 + UUID rpcId = new UUID(this.request.getRequestIdMSB(), this.request.getRequestIdLSB());
  54 + if (rpcId.equals(client.getLastSentRpcId())) {
  55 + log.debug("[{}]][{}] Rpc has already sent!", client.getEndpoint(), rpcId);
  56 + return false;
  57 + }
  58 + client.setLastSentRpcId(rpcId);
  59 + } finally {
  60 + client.unlock();
  61 + }
49 transportService.process(client.getSession(), this.request, RpcStatus.SENT, TransportServiceCallback.EMPTY); 62 transportService.process(client.getSession(), this.request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  63 + return true;
50 } 64 }
51 65
52 @Override 66 @Override