Commit c29a00656a13d850e4d1b8e224cd56789e9b05d3

Authored by Andrii Shvaika
1 parent e5b988a2

Sequential RPC processing support

@@ -400,9 +400,9 @@ public class ActorSystemContext { @@ -400,9 +400,9 @@ public class ActorSystemContext {
400 @Getter 400 @Getter
401 private String debugPerTenantLimitsConfiguration; 401 private String debugPerTenantLimitsConfiguration;
402 402
403 - @Value("${actors.rpc.sequence.enabled:false}") 403 + @Value("${actors.rpc.sequential:false}")
404 @Getter 404 @Getter
405 - private boolean rpcSequenceEnabled; 405 + private boolean rpcSequential;
406 406
407 @Value("${actors.rpc.max_retries:5}") 407 @Value("${actors.rpc.max_retries:5}")
408 @Getter 408 @Getter
@@ -122,7 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -122,7 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
122 private final Map<UUID, SessionInfo> attributeSubscriptions; 122 private final Map<UUID, SessionInfo> attributeSubscriptions;
123 private final Map<UUID, SessionInfo> rpcSubscriptions; 123 private final Map<UUID, SessionInfo> rpcSubscriptions;
124 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap; 124 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
125 - private final boolean rpcSequenceEnabled; 125 + private final boolean rpcSequential;
126 126
127 private int rpcSeq = 0; 127 private int rpcSeq = 0;
128 private String deviceName; 128 private String deviceName;
@@ -134,7 +134,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -134,7 +134,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
134 super(systemContext); 134 super(systemContext);
135 this.tenantId = tenantId; 135 this.tenantId = tenantId;
136 this.deviceId = deviceId; 136 this.deviceId = deviceId;
137 - this.rpcSequenceEnabled = systemContext.isRpcSequenceEnabled(); 137 + this.rpcSequential = systemContext.isRpcSequential();
138 this.attributeSubscriptions = new HashMap<>(); 138 this.attributeSubscriptions = new HashMap<>();
139 this.rpcSubscriptions = new HashMap<>(); 139 this.rpcSubscriptions = new HashMap<>();
140 this.toDeviceRpcPendingMap = new LinkedHashMap<>(); 140 this.toDeviceRpcPendingMap = new LinkedHashMap<>();
@@ -233,7 +233,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -233,7 +233,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
233 } 233 }
234 234
235 private boolean isSendNewRpcAvailable() { 235 private boolean isSendNewRpcAvailable() {
236 - return !rpcSequenceEnabled || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty(); 236 + return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
237 } 237 }
238 238
239 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { 239 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
@@ -332,7 +332,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -332,7 +332,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
332 } 332 }
333 Set<Integer> sentOneWayIds = new HashSet<>(); 333 Set<Integer> sentOneWayIds = new HashSet<>();
334 334
335 - if (rpcSequenceEnabled) { 335 + if (rpcSequential) {
336 getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); 336 getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
337 } else if (sessionType == SessionType.ASYNC) { 337 } else if (sessionType == SessionType.ASYNC) {
338 toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); 338 toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
@@ -348,7 +348,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -348,7 +348,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
348 } 348 }
349 349
350 private void sendNextPendingRequest(TbActorCtx context) { 350 private void sendNextPendingRequest(TbActorCtx context) {
351 - if (rpcSequenceEnabled) { 351 + if (rpcSequential) {
352 rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId())); 352 rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
353 } 353 }
354 } 354 }
@@ -357,7 +357,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -357,7 +357,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
357 return entry -> { 357 return entry -> {
358 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); 358 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
359 ToDeviceRpcRequestBody body = request.getBody(); 359 ToDeviceRpcRequestBody body = request.getBody();
360 - if (request.isOneway() && !rpcSequenceEnabled) { 360 + if (request.isOneway() && !rpcSequential) {
361 sentOneWayIds.add(entry.getKey()); 361 sentOneWayIds.add(entry.getKey());
362 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null)); 362 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
363 } 363 }
@@ -599,7 +599,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -599,7 +599,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
599 if (status.equals(RpcStatus.DELIVERED)) { 599 if (status.equals(RpcStatus.DELIVERED)) {
600 if (md.getMsg().getMsg().isOneway()) { 600 if (md.getMsg().getMsg().isOneway()) {
601 toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); 601 toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
602 - if (rpcSequenceEnabled) { 602 + if (rpcSequential) {
603 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null)); 603 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
604 } 604 }
605 } else { 605 } else {
@@ -328,8 +328,7 @@ actors: @@ -328,8 +328,7 @@ actors:
328 duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}" 328 duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
329 rpc: 329 rpc:
330 max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" 330 max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
331 - sequence:  
332 - enabled: "${ACTORS_RPC_SEQUENCE_ENABLED:false}" 331 + sequential: "${ACTORS_RPC_SEQUENTIAL:false}"
333 statistics: 332 statistics:
334 # Enable/disable actor statistics 333 # Enable/disable actor statistics
335 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 334 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -6,4 +6,5 @@ edges.storage.sleep_between_batches=500 @@ -6,4 +6,5 @@ edges.storage.sleep_between_batches=500
6 transport.lwm2m.server.security.key_alias=server 6 transport.lwm2m.server.security.key_alias=server
7 transport.lwm2m.server.security.key_password=server 7 transport.lwm2m.server.security.key_password=server
8 transport.lwm2m.bootstrap.security.key_alias=server 8 transport.lwm2m.bootstrap.security.key_alias=server
9 -transport.lwm2m.bootstrap.security.key_password=server  
  9 +transport.lwm2m.bootstrap.security.key_password=server
  10 +actors.rpc.sequential=true