Commit d49bee4b31e6caf902e94d4082621456e96def29

Authored by YevhenBondarenko
1 parent 4ecab480

send next rpc after removing

... ... @@ -400,13 +400,13 @@ public class ActorSystemContext {
400 400 @Getter
401 401 private String debugPerTenantLimitsConfiguration;
402 402
403   - @Value("${actors.rpc.sequence.enabled:true}")
  403 + @Value("${actors.rpc.sequence.enabled:false}")
404 404 @Getter
405 405 private boolean rpcSequenceEnabled;
406 406
407   - @Value("${actors.rpc.persistent.retries:5}")
  407 + @Value("${actors.rpc.max_retries:5}")
408 408 @Getter
409   - private int maxPersistentRpcRetries;
  409 + private int maxRpcRetries;
410 410
411 411 @Getter
412 412 @Setter
... ...
... ... @@ -103,6 +103,7 @@ import java.util.LinkedHashMap;
103 103 import java.util.List;
104 104 import java.util.Map;
105 105 import java.util.Objects;
  106 +import java.util.Optional;
106 107 import java.util.Set;
107 108 import java.util.UUID;
108 109 import java.util.function.Consumer;
... ... @@ -232,15 +233,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
232 233 }
233 234
234 235 private boolean isSendNewRpcAvailable() {
235   - if (rpcSequenceEnabled) {
236   - for (ToDeviceRpcRequestMetadata rpc : toDeviceRpcPendingMap.values()) {
237   - if (!rpc.isDelivered()) {
238   - return false;
239   - }
240   - }
241   - }
242   -
243   - return true;
  236 + return !rpcSequenceEnabled || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
244 237 }
245 238
246 239 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
... ... @@ -282,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
282 275
283 276 void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
284 277 log.debug("[{}] Processing remove rpc command", msg.getRequestId());
285   - Integer requestId = null;
286   - for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {
287   - if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
288   - requestId = entry.getKey();
  278 + Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
  279 + for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
  280 + if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
  281 + entry = e;
289 282 break;
290 283 }
291 284 }
292 285
293   - if (requestId != null) {
294   - toDeviceRpcPendingMap.remove(requestId);
  286 + if (entry != null) {
  287 + if (entry.getValue().isDelivered()) {
  288 + toDeviceRpcPendingMap.remove(entry.getKey());
  289 + } else {
  290 + Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
  291 + if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
  292 + toDeviceRpcPendingMap.remove(entry.getKey());
  293 + sendNextPendingRequest(context);
  294 + } else {
  295 + toDeviceRpcPendingMap.remove(entry.getKey());
  296 + }
  297 + }
295 298 }
296 299 }
297 300
... ... @@ -330,7 +333,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
330 333 Set<Integer> sentOneWayIds = new HashSet<>();
331 334
332 335 if (rpcSequenceEnabled) {
333   - toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  336 + getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
334 337 } else if (sessionType == SessionType.ASYNC) {
335 338 toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
336 339 } else {
... ... @@ -340,6 +343,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
340 343 sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
341 344 }
342 345
  346 + private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
  347 + return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
  348 + }
  349 +
343 350 private void sendNextPendingRequest(TbActorCtx context) {
344 351 if (rpcSequenceEnabled) {
345 352 rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
... ... @@ -599,7 +606,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
599 606 md.setDelivered(true);
600 607 }
601 608 } else if (status.equals(RpcStatus.TIMEOUT)) {
602   - if (systemContext.getMaxPersistentRpcRetries() <= md.getRetries()) {
  609 + Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
  610 + maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
  611 + if (maxRpcRetries <= md.getRetries()) {
603 612 toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
604 613 status = RpcStatus.FAILED;
605 614 } else {
... ...
... ... @@ -80,6 +80,7 @@ public abstract class AbstractRpcController extends BaseController {
80 80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
81 81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
82 82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
  83 + Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null;
83 84 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
84 85 @Override
85 86 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
... ... @@ -90,6 +91,7 @@ public abstract class AbstractRpcController extends BaseController {
90 91 expTime,
91 92 body,
92 93 persisted,
  94 + retries,
93 95 additionalInfo
94 96 );
95 97 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser);
... ...
... ... @@ -166,6 +166,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
166 166 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
167 167 metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted()));
168 168
  169 + if (msg.getRetries() != null) {
  170 + metaData.putValue(DataConstants.RETRIES, msg.getRetries().toString());
  171 + }
  172 +
  173 +
169 174 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
170 175 if (device != null) {
171 176 metaData.putValue("deviceName", device.getName());
... ...
... ... @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
101 101 @Override
102 102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
103 103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
104   - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo());
  104 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo());
105 105 forwardRpcRequestToDeviceActor(request, response -> {
106 106 if (src.isRestApiCall()) {
107 107 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
... ...
... ... @@ -39,6 +39,7 @@ public class DataConstants {
39 39 public static final String TIMEOUT = "timeout";
40 40 public static final String EXPIRATION_TIME = "expirationTime";
41 41 public static final String ADDITIONAL_INFO = "additionalInfo";
  42 + public static final String RETRIES = "retries";
42 43 public static final String COAP_TRANSPORT_NAME = "COAP";
43 44 public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
44 45 public static final String MQTT_TRANSPORT_NAME = "MQTT";
... ...
... ... @@ -36,6 +36,7 @@ public class ToDeviceRpcRequest implements Serializable {
36 36 private final long expirationTime;
37 37 private final ToDeviceRpcRequestBody body;
38 38 private final boolean persisted;
  39 + private final Integer retries;
39 40 @JsonIgnore
40 41 private final String additionalInfo;
41 42 }
... ...
... ... @@ -41,5 +41,5 @@ public final class RuleEngineDeviceRpcRequest {
41 41 private final long expirationTime;
42 42 private final boolean restApiCall;
43 43 private final String additionalInfo;
44   -
  44 + private final Integer retries;
45 45 }
... ...
... ... @@ -92,6 +92,9 @@ public class TbSendRPCRequestNode implements TbNode {
92 92 tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME);
93 93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
94 94
  95 + tmp = msg.getMetaData().getValue(DataConstants.RETRIES);
  96 + Integer retries = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : null;
  97 +
95 98 String params;
96 99 JsonElement paramsEl = json.get("params");
97 100 if (paramsEl.isJsonPrimitive()) {
... ... @@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode {
112 115 .requestUUID(requestUUID)
113 116 .originServiceId(originServiceId)
114 117 .expirationTime(expirationTime)
  118 + .retries(retries)
115 119 .restApiCall(restApiCall)
116 120 .persisted(persisted)
117 121 .additionalInfo(additionalInfo)
... ...