Commit acee154840e162e18d777690f1c1e32cbb40663b

Authored by Andrew Shvayka
Committed by GitHub
2 parents 2797367e c29a0065

Merge pull request #5114 from thingsboard/feature/sequential-rpc

[3.3.1] Sequential processing of RPC calls
Showing 27 changed files with 342 additions and 121 deletions
@@ -400,6 +400,14 @@ public class ActorSystemContext { @@ -400,6 +400,14 @@ public class ActorSystemContext {
400 @Getter 400 @Getter
401 private String debugPerTenantLimitsConfiguration; 401 private String debugPerTenantLimitsConfiguration;
402 402
  403 + @Value("${actors.rpc.sequential:false}")
  404 + @Getter
  405 + private boolean rpcSequential;
  406 +
  407 + @Value("${actors.rpc.max_retries:5}")
  408 + @Getter
  409 + private int maxRpcRetries;
  410 +
403 @Getter 411 @Getter
404 @Setter 412 @Setter
405 private TbActorSystem actorSystem; 413 private TbActorSystem actorSystem;
@@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
48 import org.thingsboard.server.common.data.kv.KvEntry; 48 import org.thingsboard.server.common.data.kv.KvEntry;
49 import org.thingsboard.server.common.data.page.PageData; 49 import org.thingsboard.server.common.data.page.PageData;
50 import org.thingsboard.server.common.data.page.PageLink; 50 import org.thingsboard.server.common.data.page.PageLink;
  51 +import org.thingsboard.server.common.data.page.SortOrder;
51 import org.thingsboard.server.common.data.relation.EntityRelation; 52 import org.thingsboard.server.common.data.relation.EntityRelation;
52 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 53 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
53 import org.thingsboard.server.common.data.rpc.Rpc; 54 import org.thingsboard.server.common.data.rpc.Rpc;
@@ -79,9 +80,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; @@ -79,9 +80,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
79 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; 80 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
80 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; 81 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
81 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; 82 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
82 -import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;  
83 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; 83 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
84 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; 84 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  85 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg;
85 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; 86 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
86 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 87 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
87 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; 88 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
@@ -98,9 +99,11 @@ import java.util.Arrays; @@ -98,9 +99,11 @@ import java.util.Arrays;
98 import java.util.Collections; 99 import java.util.Collections;
99 import java.util.HashMap; 100 import java.util.HashMap;
100 import java.util.HashSet; 101 import java.util.HashSet;
  102 +import java.util.LinkedHashMap;
101 import java.util.List; 103 import java.util.List;
102 import java.util.Map; 104 import java.util.Map;
103 import java.util.Objects; 105 import java.util.Objects;
  106 +import java.util.Optional;
104 import java.util.Set; 107 import java.util.Set;
105 import java.util.UUID; 108 import java.util.UUID;
106 import java.util.function.Consumer; 109 import java.util.function.Consumer;
@@ -119,6 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -119,6 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
119 private final Map<UUID, SessionInfo> attributeSubscriptions; 122 private final Map<UUID, SessionInfo> attributeSubscriptions;
120 private final Map<UUID, SessionInfo> rpcSubscriptions; 123 private final Map<UUID, SessionInfo> rpcSubscriptions;
121 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap; 124 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
  125 + private final boolean rpcSequential;
122 126
123 private int rpcSeq = 0; 127 private int rpcSeq = 0;
124 private String deviceName; 128 private String deviceName;
@@ -130,9 +134,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -130,9 +134,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
130 super(systemContext); 134 super(systemContext);
131 this.tenantId = tenantId; 135 this.tenantId = tenantId;
132 this.deviceId = deviceId; 136 this.deviceId = deviceId;
  137 + this.rpcSequential = systemContext.isRpcSequential();
133 this.attributeSubscriptions = new HashMap<>(); 138 this.attributeSubscriptions = new HashMap<>();
134 this.rpcSubscriptions = new HashMap<>(); 139 this.rpcSubscriptions = new HashMap<>();
135 - this.toDeviceRpcPendingMap = new HashMap<>(); 140 + this.toDeviceRpcPendingMap = new LinkedHashMap<>();
136 this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit); 141 this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
137 if (initAttributes()) { 142 if (initAttributes()) {
138 restoreSessions(); 143 restoreSessions();
@@ -183,19 +188,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -183,19 +188,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
183 if (timeout <= 0) { 188 if (timeout <= 0) {
184 log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); 189 log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
185 if (persisted) { 190 if (persisted) {
186 - createRpc(request, RpcStatus.TIMEOUT); 191 + createRpc(request, RpcStatus.EXPIRED);
187 } 192 }
188 return; 193 return;
189 } else if (persisted) { 194 } else if (persisted) {
190 createRpc(request, RpcStatus.QUEUED); 195 createRpc(request, RpcStatus.QUEUED);
191 } 196 }
192 197
193 - boolean sent; 198 + boolean sent = false;
194 if (systemContext.isEdgesEnabled() && edgeId != null) { 199 if (systemContext.isEdgesEnabled() && edgeId != null) {
195 log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId()); 200 log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
196 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()); 201 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
197 sent = true; 202 sent = true;
198 - } else { 203 + } else if (isSendNewRpcAvailable()) {
199 sent = rpcSubscriptions.size() > 0; 204 sent = rpcSubscriptions.size() > 0;
200 Set<UUID> syncSessionSet = new HashSet<>(); 205 Set<UUID> syncSessionSet = new HashSet<>();
201 rpcSubscriptions.forEach((key, value) -> { 206 rpcSubscriptions.forEach((key, value) -> {
@@ -227,6 +232,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -227,6 +232,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
227 } 232 }
228 } 233 }
229 234
  235 + private boolean isSendNewRpcAvailable() {
  236 + return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
  237 + }
  238 +
230 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) { 239 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
231 Rpc rpc = new Rpc(new RpcId(request.getId())); 240 Rpc rpc = new Rpc(new RpcId(request.getId()));
232 rpc.setCreatedTime(System.currentTimeMillis()); 241 rpc.setCreatedTime(System.currentTimeMillis());
@@ -266,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -266,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
266 275
267 void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) { 276 void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
268 log.debug("[{}] Processing remove rpc command", msg.getRequestId()); 277 log.debug("[{}] Processing remove rpc command", msg.getRequestId());
269 - Integer requestId = null;  
270 - for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {  
271 - if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {  
272 - requestId = entry.getKey(); 278 + Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
  279 + for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
  280 + if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
  281 + entry = e;
273 break; 282 break;
274 } 283 }
275 } 284 }
276 285
277 - if (requestId != null) {  
278 - toDeviceRpcPendingMap.remove(requestId); 286 + if (entry != null) {
  287 + if (entry.getValue().isDelivered()) {
  288 + toDeviceRpcPendingMap.remove(entry.getKey());
  289 + } else {
  290 + Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
  291 + if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
  292 + toDeviceRpcPendingMap.remove(entry.getKey());
  293 + sendNextPendingRequest(context);
  294 + } else {
  295 + toDeviceRpcPendingMap.remove(entry.getKey());
  296 + }
  297 + }
279 } 298 }
280 } 299 }
281 300
@@ -290,14 +309,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -290,14 +309,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
290 if (requestMd != null) { 309 if (requestMd != null) {
291 log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); 310 log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
292 if (requestMd.getMsg().getMsg().isPersisted()) { 311 if (requestMd.getMsg().getMsg().isPersisted()) {
293 - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null); 312 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
294 } 313 }
295 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 314 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
296 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 315 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
  316 + if (!requestMd.isDelivered()) {
  317 + sendNextPendingRequest(context);
  318 + }
297 } 319 }
298 } 320 }
299 321
300 - private void sendPendingRequests(TbActorCtx context, UUID sessionId, SessionInfoProto sessionInfo) { 322 + private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
301 SessionType sessionType = getSessionType(sessionId); 323 SessionType sessionType = getSessionType(sessionId);
302 if (!toDeviceRpcPendingMap.isEmpty()) { 324 if (!toDeviceRpcPendingMap.isEmpty()) {
303 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); 325 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@@ -309,20 +331,33 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -309,20 +331,33 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
309 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); 331 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
310 } 332 }
311 Set<Integer> sentOneWayIds = new HashSet<>(); 333 Set<Integer> sentOneWayIds = new HashSet<>();
312 - if (sessionType == SessionType.ASYNC) {  
313 - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds)); 334 +
  335 + if (rpcSequential) {
  336 + getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  337 + } else if (sessionType == SessionType.ASYNC) {
  338 + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
314 } else { 339 } else {
315 - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds)); 340 + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
316 } 341 }
317 342
318 sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove); 343 sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
319 } 344 }
320 345
  346 + private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
  347 + return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
  348 + }
  349 +
  350 + private void sendNextPendingRequest(TbActorCtx context) {
  351 + if (rpcSequential) {
  352 + rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
  353 + }
  354 + }
  355 +
321 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) { 356 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
322 return entry -> { 357 return entry -> {
323 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); 358 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
324 ToDeviceRpcRequestBody body = request.getBody(); 359 ToDeviceRpcRequestBody body = request.getBody();
325 - if (request.isOneway()) { 360 + if (request.isOneway() && !rpcSequential) {
326 sentOneWayIds.add(entry.getKey()); 361 sentOneWayIds.add(entry.getKey());
327 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null)); 362 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
328 } 363 }
@@ -355,7 +390,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -355,7 +390,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
355 processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); 390 processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
356 } 391 }
357 if (msg.hasSendPendingRPC()) { 392 if (msg.hasSendPendingRPC()) {
358 - sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo); 393 + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
359 } 394 }
360 if (msg.hasGetAttributes()) { 395 if (msg.hasGetAttributes()) {
361 handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); 396 handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
@@ -369,8 +404,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -369,8 +404,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
369 if (msg.hasClaimDevice()) { 404 if (msg.hasClaimDevice()) {
370 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); 405 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
371 } 406 }
372 - if (msg.hasPersistedRpcResponseMsg()) {  
373 - processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg()); 407 + if (msg.hasRpcResponseStatusMsg()) {
  408 + processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
374 } 409 }
375 if (msg.hasUplinkNotificationMsg()) { 410 if (msg.hasUplinkNotificationMsg()) {
376 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); 411 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
@@ -530,38 +565,63 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -530,38 +565,63 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
530 boolean success = requestMd != null; 565 boolean success = requestMd != null;
531 if (success) { 566 if (success) {
532 boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); 567 boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
533 - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();  
534 - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(  
535 - new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
536 - payload, null));  
537 - if (requestMd.getMsg().getMsg().isPersisted()) {  
538 - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;  
539 - JsonNode response;  
540 - try {  
541 - response = JacksonUtil.toJsonNode(payload);  
542 - } catch (IllegalArgumentException e) {  
543 - response = JacksonUtil.newObjectNode().put("error", payload); 568 + try {
  569 + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
  570 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
  571 + new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  572 + payload, null));
  573 + if (requestMd.getMsg().getMsg().isPersisted()) {
  574 + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  575 + JsonNode response;
  576 + try {
  577 + response = JacksonUtil.toJsonNode(payload);
  578 + } catch (IllegalArgumentException e) {
  579 + response = JacksonUtil.newObjectNode().put("error", payload);
  580 + }
  581 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
  582 + }
  583 + } finally {
  584 + if (hasError) {
  585 + sendNextPendingRequest(context);
544 } 586 }
545 - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);  
546 } 587 }
547 } else { 588 } else {
548 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 589 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
549 } 590 }
550 } 591 }
551 592
552 - private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { 593 + private void processRpcResponseStatus(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
553 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); 594 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
554 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); 595 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
555 -  
556 - ToDeviceRpcRequestMetadata md;  
557 - if (RpcStatus.DELIVERED.equals(status)) {  
558 - md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());  
559 - } else {  
560 - md = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());  
561 - } 596 + ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
562 597
563 if (md != null) { 598 if (md != null) {
564 - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null); 599 + if (status.equals(RpcStatus.DELIVERED)) {
  600 + if (md.getMsg().getMsg().isOneway()) {
  601 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  602 + if (rpcSequential) {
  603 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
  604 + }
  605 + } else {
  606 + md.setDelivered(true);
  607 + }
  608 + } else if (status.equals(RpcStatus.TIMEOUT)) {
  609 + Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
  610 + maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
  611 + if (maxRpcRetries <= md.getRetries()) {
  612 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  613 + status = RpcStatus.FAILED;
  614 + } else {
  615 + md.setRetries(md.getRetries() + 1);
  616 + }
  617 + }
  618 +
  619 + if (md.getMsg().getMsg().isPersisted()) {
  620 + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null);
  621 + }
  622 + if (status != RpcStatus.SENT) {
  623 + sendNextPendingRequest(context);
  624 + }
565 } else { 625 } else {
566 log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId); 626 log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
567 } 627 }
@@ -601,7 +661,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -601,7 +661,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
601 sessionMD.setSubscribedToRPC(true); 661 sessionMD.setSubscribedToRPC(true);
602 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); 662 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
603 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); 663 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
604 - sendPendingRequests(context, sessionId, sessionInfo); 664 + sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
605 dumpSessions(); 665 dumpSessions();
606 } 666 }
607 } 667 }
@@ -871,7 +931,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -871,7 +931,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
871 931
872 void init(TbActorCtx ctx) { 932 void init(TbActorCtx ctx) {
873 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); 933 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
874 - PageLink pageLink = new PageLink(1024); 934 + PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
875 PageData<Rpc> pageData; 935 PageData<Rpc> pageData;
876 do { 936 do {
877 pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink); 937 pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
@@ -879,7 +939,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -879,7 +939,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
879 ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class); 939 ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
880 long timeout = rpc.getExpirationTime() - System.currentTimeMillis(); 940 long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
881 if (timeout <= 0) { 941 if (timeout <= 0) {
882 - rpc.setStatus(RpcStatus.TIMEOUT); 942 + rpc.setStatus(RpcStatus.EXPIRED);
883 systemContext.getTbRpcService().save(tenantId, rpc); 943 systemContext.getTbRpcService().save(tenantId, rpc);
884 } else { 944 } else {
885 registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout); 945 registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
@@ -25,4 +25,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; @@ -25,4 +25,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
25 public class ToDeviceRpcRequestMetadata { 25 public class ToDeviceRpcRequestMetadata {
26 private final ToDeviceRpcRequestActorMsg msg; 26 private final ToDeviceRpcRequestActorMsg msg;
27 private final boolean sent; 27 private final boolean sent;
  28 + private int retries;
  29 + private boolean delivered;
28 } 30 }
@@ -75,11 +75,12 @@ public abstract class AbstractRpcController extends BaseController { @@ -75,11 +75,12 @@ public abstract class AbstractRpcController extends BaseController {
75 SecurityUser currentUser = getCurrentUser(); 75 SecurityUser currentUser = getCurrentUser();
76 TenantId tenantId = currentUser.getTenantId(); 76 TenantId tenantId = currentUser.getTenantId();
77 final DeferredResult<ResponseEntity> response = new DeferredResult<>(); 77 final DeferredResult<ResponseEntity> response = new DeferredResult<>();
78 - long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;  
79 - long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); 78 + long timeout = rpcRequestBody.has(DataConstants.TIMEOUT) ? rpcRequestBody.get(DataConstants.TIMEOUT).asLong() : defaultTimeout;
  79 + long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout);
80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); 80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean(); 81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO)); 82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
  83 + Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null;
83 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() { 84 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
84 @Override 85 @Override
85 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { 86 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
@@ -90,6 +91,7 @@ public abstract class AbstractRpcController extends BaseController { @@ -90,6 +91,7 @@ public abstract class AbstractRpcController extends BaseController {
90 expTime, 91 expTime,
91 body, 92 body,
92 persisted, 93 persisted,
  94 + retries,
93 additionalInfo 95 additionalInfo
94 ); 96 );
95 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser); 97 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser);
@@ -166,6 +166,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @@ -166,6 +166,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
166 metaData.putValue("oneway", Boolean.toString(msg.isOneway())); 166 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
167 metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted())); 167 metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted()));
168 168
  169 + if (msg.getRetries() != null) {
  170 + metaData.putValue(DataConstants.RETRIES, msg.getRetries().toString());
  171 + }
  172 +
  173 +
169 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId()); 174 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
170 if (device != null) { 175 if (device != null) {
171 metaData.putValue("deviceName", device.getName()); 176 metaData.putValue("deviceName", device.getName());
@@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
101 @Override 101 @Override
102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { 102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), 103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
104 - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); 104 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo());
105 forwardRpcRequestToDeviceActor(request, response -> { 105 forwardRpcRequestToDeviceActor(request, response -> {
106 if (src.isRestApiCall()) { 106 if (src.isRestApiCall()) {
107 sendRpcResponseToTbCore(src.getOriginServiceId(), response); 107 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
@@ -326,6 +326,9 @@ actors: @@ -326,6 +326,9 @@ actors:
326 queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}" 326 queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
327 # Time in milliseconds for transaction to complete 327 # Time in milliseconds for transaction to complete
328 duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}" 328 duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
  329 + rpc:
  330 + max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
  331 + sequential: "${ACTORS_RPC_SEQUENTIAL:false}"
329 statistics: 332 statistics:
330 # Enable/disable actor statistics 333 # Enable/disable actor statistics
331 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 334 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -699,7 +702,7 @@ transport: @@ -699,7 +702,7 @@ transport:
699 702
700 # Edges parameters 703 # Edges parameters
701 edges: 704 edges:
702 - enabled: "${EDGES_ENABLED:false}" 705 + enabled: "${EDGES_ENABLED:true}"
703 rpc: 706 rpc:
704 port: "${EDGES_RPC_PORT:7070}" 707 port: "${EDGES_RPC_PORT:7070}"
705 client_max_keep_alive_time_sec: "${EDGES_RPC_CLIENT_MAX_KEEP_ALIVE_TIME_SEC:300}" 708 client_max_keep_alive_time_sec: "${EDGES_RPC_CLIENT_MAX_KEEP_ALIVE_TIME_SEC:300}"
@@ -90,6 +90,11 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab @@ -90,6 +90,11 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab
90 } 90 }
91 91
92 @Test 92 @Test
  93 + public void testSequenceServerMqttTwoWayRpc() throws Exception {
  94 + processSequenceTwoWayRpcTest();
  95 + }
  96 +
  97 + @Test
93 public void testGatewayServerMqttOneWayRpc() throws Exception { 98 public void testGatewayServerMqttOneWayRpc() throws Exception {
94 processOneWayRpcTestGateway("Gateway Device OneWay RPC"); 99 processOneWayRpcTestGateway("Gateway Device OneWay RPC");
95 } 100 }
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.transport.mqtt.rpc; 16 package org.thingsboard.server.transport.mqtt.rpc;
17 17
18 import com.fasterxml.jackson.databind.JsonNode; 18 import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
19 import com.google.protobuf.InvalidProtocolBufferException; 20 import com.google.protobuf.InvalidProtocolBufferException;
20 import com.nimbusds.jose.util.StandardCharset; 21 import com.nimbusds.jose.util.StandardCharset;
21 import io.netty.handler.codec.mqtt.MqttQoS; 22 import io.netty.handler.codec.mqtt.MqttQoS;
@@ -27,13 +28,16 @@ import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -27,13 +28,16 @@ import org.eclipse.paho.client.mqttv3.MqttCallback;
27 import org.eclipse.paho.client.mqttv3.MqttException; 28 import org.eclipse.paho.client.mqttv3.MqttException;
28 import org.eclipse.paho.client.mqttv3.MqttMessage; 29 import org.eclipse.paho.client.mqttv3.MqttMessage;
29 import org.junit.Assert; 30 import org.junit.Assert;
  31 +import org.thingsboard.common.util.JacksonUtil;
30 import org.thingsboard.server.common.data.Device; 32 import org.thingsboard.server.common.data.Device;
31 import org.thingsboard.server.common.data.TransportPayloadType; 33 import org.thingsboard.server.common.data.TransportPayloadType;
32 import org.thingsboard.server.common.data.device.profile.MqttTopics; 34 import org.thingsboard.server.common.data.device.profile.MqttTopics;
33 -import org.thingsboard.common.util.JacksonUtil;  
34 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; 35 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
35 36
  37 +import java.util.ArrayList;
36 import java.util.Arrays; 38 import java.util.Arrays;
  39 +import java.util.List;
  40 +import java.util.concurrent.CopyOnWriteArrayList;
37 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.TimeUnit;
39 43
@@ -101,6 +105,32 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -101,6 +105,32 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
101 Assert.assertEquals(expected, result); 105 Assert.assertEquals(expected, result);
102 } 106 }
103 107
  108 + protected void processSequenceTwoWayRpcTest() throws Exception {
  109 + List<String> expected = new ArrayList<>();
  110 + List<String> result = new ArrayList<>();
  111 +
  112 + String deviceId = savedDevice.getId().getId().toString();
  113 +
  114 + for (int i = 0; i < 10; i++) {
  115 + ObjectNode request = JacksonUtil.newObjectNode();
  116 + request.put("method", "test");
  117 + request.put("params", i);
  118 + expected.add(JacksonUtil.toString(request));
  119 + request.put("persistent", true);
  120 + doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
  121 + }
  122 +
  123 + MqttAsyncClient client = getMqttAsyncClient(accessToken);
  124 + client.setManualAcks(true);
  125 + CountDownLatch latch = new CountDownLatch(10);
  126 + TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result);
  127 + client.setCallback(callback);
  128 + client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
  129 +
  130 + latch.await(10, TimeUnit.SECONDS);
  131 + Assert.assertEquals(expected, result);
  132 + }
  133 +
104 protected void processTwoWayRpcTestGateway(String deviceName) throws Exception { 134 protected void processTwoWayRpcTestGateway(String deviceName) throws Exception {
105 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); 135 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
106 136
@@ -213,4 +243,38 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -213,4 +243,38 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
213 243
214 } 244 }
215 } 245 }
  246 +
  247 + protected class TestSequenceMqttCallback implements MqttCallback {
  248 +
  249 + private final MqttAsyncClient client;
  250 + private final CountDownLatch latch;
  251 + private final List<String> expected;
  252 +
  253 + TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) {
  254 + this.client = client;
  255 + this.latch = latch;
  256 + this.expected = expected;
  257 + }
  258 +
  259 + @Override
  260 + public void connectionLost(Throwable throwable) {
  261 + }
  262 +
  263 + @Override
  264 + public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
  265 + log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
  266 + expected.add(new String(mqttMessage.getPayload()));
  267 + String responseTopic = requestTopic.replace("request", "response");
  268 + var qoS = mqttMessage.getQos();
  269 +
  270 + client.messageArrivedComplete(mqttMessage.getId(), qoS);
  271 + client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
  272 + latch.countDown();
  273 + }
  274 +
  275 + @Override
  276 + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  277 +
  278 + }
  279 + }
216 } 280 }
@@ -6,4 +6,5 @@ edges.storage.sleep_between_batches=500 @@ -6,4 +6,5 @@ edges.storage.sleep_between_batches=500
6 transport.lwm2m.server.security.key_alias=server 6 transport.lwm2m.server.security.key_alias=server
7 transport.lwm2m.server.security.key_password=server 7 transport.lwm2m.server.security.key_password=server
8 transport.lwm2m.bootstrap.security.key_alias=server 8 transport.lwm2m.bootstrap.security.key_alias=server
9 -transport.lwm2m.bootstrap.security.key_password=server  
  9 +transport.lwm2m.bootstrap.security.key_password=server
  10 +actors.rpc.sequential=true
@@ -346,7 +346,7 @@ message UplinkNotificationMsg { @@ -346,7 +346,7 @@ message UplinkNotificationMsg {
346 int64 uplinkTs = 1; 346 int64 uplinkTs = 1;
347 } 347 }
348 348
349 -message ToDevicePersistedRpcResponseMsg { 349 +message ToDeviceRpcResponseStatusMsg {
350 int32 requestId = 1; 350 int32 requestId = 1;
351 int64 requestIdMSB = 2; 351 int64 requestIdMSB = 2;
352 int64 requestIdLSB = 3; 352 int64 requestIdLSB = 3;
@@ -456,7 +456,7 @@ message TransportToDeviceActorMsg { @@ -456,7 +456,7 @@ message TransportToDeviceActorMsg {
456 SubscriptionInfoProto subscriptionInfo = 7; 456 SubscriptionInfoProto subscriptionInfo = 7;
457 ClaimDeviceMsg claimDevice = 8; 457 ClaimDeviceMsg claimDevice = 8;
458 ProvisionDeviceRequestMsg provisionDevice = 9; 458 ProvisionDeviceRequestMsg provisionDevice = 9;
459 - ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10; 459 + ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10;
460 SendPendingRPCMsg sendPendingRPC = 11; 460 SendPendingRPCMsg sendPendingRPC = 11;
461 UplinkNotificationMsg uplinkNotificationMsg = 12; 461 UplinkNotificationMsg uplinkNotificationMsg = 12;
462 } 462 }
@@ -36,7 +36,10 @@ public class DataConstants { @@ -36,7 +36,10 @@ public class DataConstants {
36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats"; 36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration"; 37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
38 public static final String PERSISTENT = "persistent"; 38 public static final String PERSISTENT = "persistent";
  39 + public static final String TIMEOUT = "timeout";
  40 + public static final String EXPIRATION_TIME = "expirationTime";
39 public static final String ADDITIONAL_INFO = "additionalInfo"; 41 public static final String ADDITIONAL_INFO = "additionalInfo";
  42 + public static final String RETRIES = "retries";
40 public static final String COAP_TRANSPORT_NAME = "COAP"; 43 public static final String COAP_TRANSPORT_NAME = "COAP";
41 public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; 44 public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
42 public static final String MQTT_TRANSPORT_NAME = "MQTT"; 45 public static final String MQTT_TRANSPORT_NAME = "MQTT";
@@ -85,9 +88,11 @@ public class DataConstants { @@ -85,9 +88,11 @@ public class DataConstants {
85 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE"; 88 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
86 89
87 public static final String RPC_QUEUED = "RPC_QUEUED"; 90 public static final String RPC_QUEUED = "RPC_QUEUED";
  91 + public static final String RPC_SENT = "RPC_SENT";
88 public static final String RPC_DELIVERED = "RPC_DELIVERED"; 92 public static final String RPC_DELIVERED = "RPC_DELIVERED";
89 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; 93 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
90 public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; 94 public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
  95 + public static final String RPC_EXPIRED = "RPC_EXPIRED";
91 public static final String RPC_FAILED = "RPC_FAILED"; 96 public static final String RPC_FAILED = "RPC_FAILED";
92 public static final String RPC_DELETED = "RPC_DELETED"; 97 public static final String RPC_DELETED = "RPC_DELETED";
93 98
@@ -16,5 +16,5 @@ @@ -16,5 +16,5 @@
16 package org.thingsboard.server.common.data.rpc; 16 package org.thingsboard.server.common.data.rpc;
17 17
18 public enum RpcStatus { 18 public enum RpcStatus {
19 - QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED 19 + QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED
20 } 20 }
@@ -36,6 +36,7 @@ public class ToDeviceRpcRequest implements Serializable { @@ -36,6 +36,7 @@ public class ToDeviceRpcRequest implements Serializable {
36 private final long expirationTime; 36 private final long expirationTime;
37 private final ToDeviceRpcRequestBody body; 37 private final ToDeviceRpcRequestBody body;
38 private final boolean persisted; 38 private final boolean persisted;
  39 + private final Integer retries;
39 @JsonIgnore 40 @JsonIgnore
40 private final String additionalInfo; 41 private final String additionalInfo;
41 } 42 }
@@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
42 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration; 42 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
43 import org.thingsboard.server.common.data.id.DeviceId; 43 import org.thingsboard.server.common.data.id.DeviceId;
44 import org.thingsboard.server.common.data.id.DeviceProfileId; 44 import org.thingsboard.server.common.data.id.DeviceProfileId;
  45 +import org.thingsboard.server.common.data.rpc.RpcStatus;
45 import org.thingsboard.server.common.msg.session.FeatureType; 46 import org.thingsboard.server.common.msg.session.FeatureType;
46 import org.thingsboard.server.common.msg.session.SessionMsgType; 47 import org.thingsboard.server.common.msg.session.SessionMsgType;
47 import org.thingsboard.server.common.transport.SessionMsgListener; 48 import org.thingsboard.server.common.transport.SessionMsgListener;
@@ -192,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -192,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
192 client.lock(); 193 client.lock();
193 try { 194 try {
194 long uplinkTime = client.updateLastUplinkTime(uplinkTs); 195 long uplinkTime = client.updateLastUplinkTime(uplinkTs);
195 - long timeout;  
196 - if (PowerMode.PSM.equals(powerMode)) {  
197 - Long psmActivityTimer = client.getPsmActivityTimer();  
198 - if (psmActivityTimer == null && profileSettings != null) {  
199 - psmActivityTimer = profileSettings.getPsmActivityTimer();  
200 -  
201 - }  
202 - if (psmActivityTimer == null || psmActivityTimer == 0L) {  
203 - psmActivityTimer = config.getPsmActivityTimer();  
204 - }  
205 -  
206 - timeout = psmActivityTimer;  
207 - } else {  
208 - Long pagingTransmissionWindow = client.getPagingTransmissionWindow();  
209 - if (pagingTransmissionWindow == null && profileSettings != null) {  
210 - pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();  
211 -  
212 - }  
213 - if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {  
214 - pagingTransmissionWindow = config.getPagingTransmissionWindow();  
215 - }  
216 - timeout = pagingTransmissionWindow;  
217 - } 196 + long timeout = getTimeout(client, powerMode, profileSettings);
218 Future<Void> sleepTask = client.getSleepTask(); 197 Future<Void> sleepTask = client.getSleepTask();
219 if (sleepTask != null) { 198 if (sleepTask != null) {
220 sleepTask.cancel(false); 199 sleepTask.cancel(false);
@@ -234,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -234,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext {
234 } 213 }
235 } 214 }
236 215
  216 + private long getTimeout(TbCoapClientState client, PowerMode powerMode, PowerSavingConfiguration profileSettings) {
  217 + long timeout;
  218 + if (PowerMode.PSM.equals(powerMode)) {
  219 + Long psmActivityTimer = client.getPsmActivityTimer();
  220 + if (psmActivityTimer == null && profileSettings != null) {
  221 + psmActivityTimer = profileSettings.getPsmActivityTimer();
  222 +
  223 + }
  224 + if (psmActivityTimer == null || psmActivityTimer == 0L) {
  225 + psmActivityTimer = config.getPsmActivityTimer();
  226 + }
  227 +
  228 + timeout = psmActivityTimer;
  229 + } else {
  230 + Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
  231 + if (pagingTransmissionWindow == null && profileSettings != null) {
  232 + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
  233 +
  234 + }
  235 + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
  236 + pagingTransmissionWindow = config.getPagingTransmissionWindow();
  237 + }
  238 + timeout = pagingTransmissionWindow;
  239 + }
  240 + return timeout;
  241 + }
  242 +
237 private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) { 243 private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) {
238 state.lock(); 244 state.lock();
239 try { 245 try {
@@ -524,17 +530,38 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -524,17 +530,38 @@ public class DefaultCoapClientContext implements CoapClientContext {
524 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder()); 530 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
525 int requestId = getNextMsgId(); 531 int requestId = getNextMsgId();
526 response.setMID(requestId); 532 response.setMID(requestId);
527 - if (msg.getPersisted() && conRequest) { 533 + if (conRequest) {
  534 + PowerMode powerMode = state.getPowerMode();
  535 + PowerSavingConfiguration profileSettings = null;
  536 + if (powerMode == null) {
  537 + var clientProfile = getProfile(state.getProfileId());
  538 + if (clientProfile.isPresent()) {
  539 + profileSettings = clientProfile.get().getClientSettings();
  540 + if (profileSettings != null) {
  541 + powerMode = profileSettings.getPowerMode();
  542 + }
  543 + }
  544 + }
  545 +
528 transportContext.getRpcAwaitingAck().put(requestId, msg); 546 transportContext.getRpcAwaitingAck().put(requestId, msg);
529 transportContext.getScheduler().schedule(() -> { 547 transportContext.getScheduler().schedule(() -> {
530 - transportContext.getRpcAwaitingAck().remove(requestId);  
531 - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); 548 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId);
  549 + if (rpcRequestMsg != null) {
  550 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  551 + }
  552 + }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  553 +
532 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { 554 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
533 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); 555 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
534 if (rpcRequestMsg != null) { 556 if (rpcRequestMsg != null) {
535 - transportService.process(state.getSession(), rpcRequestMsg, TransportServiceCallback.EMPTY); 557 + transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
536 } 558 }
537 - }, null)); 559 + }, id -> {
  560 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
  561 + if (rpcRequestMsg != null) {
  562 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  563 + }
  564 + }));
538 } 565 }
539 if (conRequest) { 566 if (conRequest) {
540 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); 567 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
@@ -553,8 +580,12 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -553,8 +580,12 @@ public class DefaultCoapClientContext implements CoapClientContext {
553 transportService.process(state.getSession(), 580 transportService.process(state.getSession(),
554 TransportProtos.ToDeviceRpcResponseMsg.newBuilder() 581 TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
555 .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY); 582 .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
556 - } else if (msg.getPersisted() && !conRequest && sent) {  
557 - transportService.process(state.getSession(), msg, TransportServiceCallback.EMPTY); 583 + } else if (sent) {
  584 + if (!conRequest) {
  585 + transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  586 + } else if (msg.getPersisted()) {
  587 + transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  588 + }
558 } 589 }
559 } 590 }
560 } 591 }
@@ -723,7 +754,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -723,7 +754,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
723 private void cancelRpcSubscription(TbCoapClientState state) { 754 private void cancelRpcSubscription(TbCoapClientState state) {
724 if (state.getRpc() != null) { 755 if (state.getRpc() != null) {
725 clientsByToken.remove(state.getRpc().getToken()); 756 clientsByToken.remove(state.getRpc().getToken());
726 - CoapExchange exchange = state.getAttrs().getExchange(); 757 + CoapExchange exchange = state.getRpc().getExchange();
727 state.setRpc(null); 758 state.setRpc(null);
728 transportService.process(state.getSession(), 759 transportService.process(state.getSession(),
729 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), 760 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
@@ -407,7 +407,7 @@ public class DeviceApiController implements TbTransportService { @@ -407,7 +407,7 @@ public class DeviceApiController implements TbTransportService {
407 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) { 407 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
408 log.trace("[{}] Received RPC command to device", sessionId); 408 log.trace("[{}] Received RPC command to device", sessionId);
409 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK)); 409 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
410 - transportService.process(sessionInfo, msg, TransportServiceCallback.EMPTY); 410 + transportService.process(sessionInfo, msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
411 } 411 }
412 412
413 @Override 413 @Override
@@ -21,7 +21,9 @@ import org.eclipse.leshan.core.ResponseCode; @@ -21,7 +21,9 @@ import org.eclipse.leshan.core.ResponseCode;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 import org.thingsboard.common.util.JacksonUtil; 22 import org.thingsboard.common.util.JacksonUtil;
23 import org.thingsboard.server.common.data.StringUtils; 23 import org.thingsboard.server.common.data.StringUtils;
  24 +import org.thingsboard.server.common.data.rpc.RpcStatus;
24 import org.thingsboard.server.common.transport.TransportService; 25 import org.thingsboard.server.common.transport.TransportService;
  26 +import org.thingsboard.server.common.transport.TransportServiceCallback;
25 import org.thingsboard.server.gen.transport.TransportProtos; 27 import org.thingsboard.server.gen.transport.TransportProtos;
26 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; 28 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
27 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; 29 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
@@ -158,6 +160,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { @@ -158,6 +160,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
158 throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); 160 throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
159 } 161 }
160 } 162 }
  163 + transportService.process(client.getSession(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
161 } catch (IllegalArgumentException e) { 164 } catch (IllegalArgumentException e) {
162 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage()); 165 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
163 } 166 }
@@ -19,6 +19,7 @@ import org.eclipse.leshan.core.ResponseCode; @@ -19,6 +19,7 @@ import org.eclipse.leshan.core.ResponseCode;
19 import org.eclipse.leshan.core.request.exception.ClientSleepingException; 19 import org.eclipse.leshan.core.request.exception.ClientSleepingException;
20 import org.thingsboard.common.util.JacksonUtil; 20 import org.thingsboard.common.util.JacksonUtil;
21 import org.thingsboard.server.common.data.StringUtils; 21 import org.thingsboard.server.common.data.StringUtils;
  22 +import org.thingsboard.server.common.data.rpc.RpcStatus;
22 import org.thingsboard.server.common.transport.TransportService; 23 import org.thingsboard.server.common.transport.TransportService;
23 import org.thingsboard.server.common.transport.TransportServiceCallback; 24 import org.thingsboard.server.common.transport.TransportServiceCallback;
24 import org.thingsboard.server.gen.transport.TransportProtos; 25 import org.thingsboard.server.gen.transport.TransportProtos;
@@ -44,7 +45,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -44,7 +45,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
44 45
45 @Override 46 @Override
46 public void onSuccess(R request, T response) { 47 public void onSuccess(R request, T response) {
47 - transportService.process(client.getSession(), this.request, TransportServiceCallback.EMPTY); 48 + transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
48 sendRpcReplyOnSuccess(response); 49 sendRpcReplyOnSuccess(response);
49 if (callback != null) { 50 if (callback != null) {
50 callback.onSuccess(request, response); 51 callback.onSuccess(request, response);
@@ -61,7 +62,9 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR @@ -61,7 +62,9 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
61 62
62 @Override 63 @Override
63 public void onError(String params, Exception e) { 64 public void onError(String params, Exception e) {
64 - if (!(e instanceof TimeoutException || e instanceof ClientSleepingException)) { 65 + if (e instanceof TimeoutException) {
  66 + transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  67 + } else if (!(e instanceof ClientSleepingException)) {
65 sendRpcReplyOnError(e); 68 sendRpcReplyOnError(e);
66 } 69 }
67 if (callback != null) { 70 if (callback != null) {
@@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext { @@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext {
68 @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") 68 @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
69 private int messageQueueSizePerDeviceLimit; 69 private int messageQueueSizePerDeviceLimit;
70 70
  71 + @Getter
  72 + @Value("${transport.mqtt.timeout:10000}")
  73 + private long timeout;
71 } 74 }
@@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.TransportPayloadType; @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
50 import org.thingsboard.server.common.data.device.profile.MqttTopics; 50 import org.thingsboard.server.common.data.device.profile.MqttTopics;
51 import org.thingsboard.server.common.data.id.OtaPackageId; 51 import org.thingsboard.server.common.data.id.OtaPackageId;
52 import org.thingsboard.server.common.data.ota.OtaPackageType; 52 import org.thingsboard.server.common.data.ota.OtaPackageType;
  53 +import org.thingsboard.server.common.data.rpc.RpcStatus;
53 import org.thingsboard.server.common.msg.EncryptionUtil; 54 import org.thingsboard.server.common.msg.EncryptionUtil;
54 import org.thingsboard.server.common.msg.tools.TbRateLimitsException; 55 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
55 import org.thingsboard.server.common.transport.SessionMsgListener; 56 import org.thingsboard.server.common.transport.SessionMsgListener;
@@ -272,7 +273,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -272,7 +273,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
272 int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId(); 273 int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
273 TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId); 274 TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
274 if (rpcRequest != null) { 275 if (rpcRequest != null) {
275 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY); 276 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
276 } 277 }
277 break; 278 break;
278 default: 279 default:
@@ -849,20 +850,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -849,20 +850,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
849 try { 850 try {
850 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { 851 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
851 int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); 852 int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
852 - if (rpcRequest.getPersisted() && isAckExpected(payload)) { 853 + if (isAckExpected(payload)) {
853 rpcAwaitingAck.put(msgId, rpcRequest); 854 rpcAwaitingAck.put(msgId, rpcRequest);
854 context.getScheduler().schedule(() -> { 855 context.getScheduler().schedule(() -> {
855 - rpcAwaitingAck.remove(msgId);  
856 - }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); 856 + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
  857 + if (msg != null) {
  858 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  859 + }
  860 + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
857 } 861 }
858 var cf = publish(payload, deviceSessionCtx); 862 var cf = publish(payload, deviceSessionCtx);
859 - if (rpcRequest.getPersisted() && !isAckExpected(payload)) {  
860 - cf.addListener(result -> {  
861 - if (result.cause() == null) {  
862 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY); 863 + cf.addListener(result -> {
  864 + if (result.cause() == null) {
  865 + if (!isAckExpected(payload)) {
  866 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  867 + } else if (rpcRequest.getPersisted()) {
  868 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
863 } 869 }
864 - });  
865 - } 870 + } else {
  871 + // TODO: send error
  872 + }
  873 + });
866 }); 874 });
867 } catch (Exception e) { 875 } catch (Exception e) {
868 transportService.process(deviceSessionCtx.getSessionInfo(), 876 transportService.process(deviceSessionCtx.getSessionInfo(),
@@ -16,8 +16,10 @@ @@ -16,8 +16,10 @@
16 package org.thingsboard.server.transport.mqtt.session; 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 import io.netty.channel.ChannelFuture; 18 import io.netty.channel.ChannelFuture;
  19 +import io.netty.handler.codec.mqtt.MqttMessage;
19 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
20 import org.thingsboard.server.common.data.DeviceProfile; 21 import org.thingsboard.server.common.data.DeviceProfile;
  22 +import org.thingsboard.server.common.data.rpc.RpcStatus;
21 import org.thingsboard.server.common.transport.SessionMsgListener; 23 import org.thingsboard.server.common.transport.SessionMsgListener;
22 import org.thingsboard.server.common.transport.TransportService; 24 import org.thingsboard.server.common.transport.TransportService;
23 import org.thingsboard.server.common.transport.TransportServiceCallback; 25 import org.thingsboard.server.common.transport.TransportServiceCallback;
@@ -102,9 +104,14 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -102,9 +104,14 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
102 payload -> { 104 payload -> {
103 ChannelFuture channelFuture = parent.writeAndFlush(payload); 105 ChannelFuture channelFuture = parent.writeAndFlush(payload);
104 if (request.getPersisted()) { 106 if (request.getPersisted()) {
105 - channelFuture.addListener(future -> {  
106 - if (future.cause() == null) {  
107 - transportService.process(getSessionInfo(), request, TransportServiceCallback.EMPTY); 107 + channelFuture.addListener(result -> {
  108 + if (result.cause() == null) {
  109 + if (!isAckExpected(payload)) {
  110 + transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  111 + } else if (request.getPersisted()) {
  112 + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  113 +
  114 + }
108 } 115 }
109 }); 116 });
110 } 117 }
@@ -129,4 +136,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @@ -129,4 +136,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
129 // This feature is not supported in the TB IoT Gateway yet. 136 // This feature is not supported in the TB IoT Gateway yet.
130 } 137 }
131 138
  139 + private boolean isAckExpected(MqttMessage message) {
  140 + return message.fixedHeader().qosLevel().value() > 0;
  141 + }
  142 +
132 } 143 }
@@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile; @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; 26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; 27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
28 import org.thingsboard.server.common.data.id.DeviceId; 28 import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.rpc.RpcStatus;
29 import org.thingsboard.server.common.transport.SessionMsgListener; 30 import org.thingsboard.server.common.transport.SessionMsgListener;
30 import org.thingsboard.server.common.transport.TransportServiceCallback; 31 import org.thingsboard.server.common.transport.TransportServiceCallback;
31 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; 32 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
@@ -142,7 +143,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @@ -142,7 +143,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
142 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) { 143 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
143 log.trace("[{}] Received RPC command to device", sessionId); 144 log.trace("[{}] Received RPC command to device", sessionId);
144 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); 145 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
145 - snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, TransportServiceCallback.EMPTY); 146 + snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
146 } 147 }
147 148
148 @Override 149 @Override
@@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport; @@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport;
17 17
18 import org.thingsboard.server.common.data.DeviceProfile; 18 import org.thingsboard.server.common.data.DeviceProfile;
19 import org.thingsboard.server.common.data.DeviceTransportType; 19 import org.thingsboard.server.common.data.DeviceTransportType;
  20 +import org.thingsboard.server.common.data.rpc.RpcStatus;
20 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; 21 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; 22 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22 import org.thingsboard.server.common.transport.service.SessionMetaData; 23 import org.thingsboard.server.common.transport.service.SessionMetaData;
@@ -112,7 +113,7 @@ public interface TransportService { @@ -112,7 +113,7 @@ public interface TransportService {
112 113
113 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback); 114 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
114 115
115 - void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback); 116 + void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback);
116 117
117 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback); 118 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
118 119
@@ -589,22 +589,18 @@ public class DefaultTransportService implements TransportService { @@ -589,22 +589,18 @@ public class DefaultTransportService implements TransportService {
589 } 589 }
590 590
591 @Override 591 @Override
592 - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback) {  
593 - if (msg.getPersisted()) {  
594 - RpcStatus status = msg.getOneway() ? RpcStatus.SUCCESSFUL : RpcStatus.DELIVERED;  
595 -  
596 - TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()  
597 - .setRequestId(msg.getRequestId())  
598 - .setRequestIdLSB(msg.getRequestIdLSB())  
599 - .setRequestIdMSB(msg.getRequestIdMSB())  
600 - .setStatus(status.name())  
601 - .build();  
602 -  
603 - if (checkLimits(sessionInfo, responseMsg, callback)) {  
604 - reportActivityInternal(sessionInfo);  
605 - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(responseMsg).build(),  
606 - new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));  
607 - } 592 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
  593 + TransportProtos.ToDeviceRpcResponseStatusMsg responseMsg = TransportProtos.ToDeviceRpcResponseStatusMsg.newBuilder()
  594 + .setRequestId(msg.getRequestId())
  595 + .setRequestIdLSB(msg.getRequestIdLSB())
  596 + .setRequestIdMSB(msg.getRequestIdMSB())
  597 + .setStatus(rpcStatus.name())
  598 + .build();
  599 +
  600 + if (checkLimits(sessionInfo, responseMsg, callback)) {
  601 + reportActivityInternal(sessionInfo);
  602 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setRpcResponseStatusMsg(responseMsg).build(),
  603 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
608 } 604 }
609 } 605 }
610 606
@@ -41,5 +41,5 @@ public final class RuleEngineDeviceRpcRequest { @@ -41,5 +41,5 @@ public final class RuleEngineDeviceRpcRequest {
41 private final long expirationTime; 41 private final long expirationTime;
42 private final boolean restApiCall; 42 private final boolean restApiCall;
43 private final String additionalInfo; 43 private final String additionalInfo;
44 - 44 + private final Integer retries;
45 } 45 }
@@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
33 type = ComponentType.FILTER, 33 type = ComponentType.FILTER,
34 name = "message type switch", 34 name = "message type switch",
35 configClazz = EmptyNodeConfiguration.class, 35 configClazz = EmptyNodeConfiguration.class,
36 - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted", 36 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted",
37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", 37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", 38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
39 "Timeseries Updated", "Timeseries Deleted"}, 39 "Timeseries Updated", "Timeseries Deleted"},
@@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode { @@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode {
97 relationType = "Timeseries Deleted"; 97 relationType = "Timeseries Deleted";
98 } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) { 98 } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
99 relationType = "RPC Queued"; 99 relationType = "RPC Queued";
  100 + } else if (msg.getType().equals(DataConstants.RPC_SENT)) {
  101 + relationType = "RPC Sent";
100 } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) { 102 } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
101 relationType = "RPC Delivered"; 103 relationType = "RPC Delivered";
102 } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) { 104 } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {
103 relationType = "RPC Successful"; 105 relationType = "RPC Successful";
104 } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) { 106 } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) {
105 relationType = "RPC Timeout"; 107 relationType = "RPC Timeout";
  108 + } else if (msg.getType().equals(DataConstants.RPC_EXPIRED)) {
  109 + relationType = "RPC Expired";
106 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { 110 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
107 relationType = "RPC Failed"; 111 relationType = "RPC Failed";
108 } else if (msg.getType().equals(DataConstants.RPC_DELETED)) { 112 } else if (msg.getType().equals(DataConstants.RPC_DELETED)) {
@@ -89,9 +89,12 @@ public class TbSendRPCRequestNode implements TbNode { @@ -89,9 +89,12 @@ public class TbSendRPCRequestNode implements TbNode {
89 tmp = msg.getMetaData().getValue("originServiceId"); 89 tmp = msg.getMetaData().getValue("originServiceId");
90 String originServiceId = !StringUtils.isEmpty(tmp) ? tmp : null; 90 String originServiceId = !StringUtils.isEmpty(tmp) ? tmp : null;
91 91
92 - tmp = msg.getMetaData().getValue("expirationTime"); 92 + tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME);
93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); 93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
94 94
  95 + tmp = msg.getMetaData().getValue(DataConstants.RETRIES);
  96 + Integer retries = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : null;
  97 +
95 String params; 98 String params;
96 JsonElement paramsEl = json.get("params"); 99 JsonElement paramsEl = json.get("params");
97 if (paramsEl.isJsonPrimitive()) { 100 if (paramsEl.isJsonPrimitive()) {
@@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode {
112 .requestUUID(requestUUID) 115 .requestUUID(requestUUID)
113 .originServiceId(originServiceId) 116 .originServiceId(originServiceId)
114 .expirationTime(expirationTime) 117 .expirationTime(expirationTime)
  118 + .retries(retries)
115 .restApiCall(restApiCall) 119 .restApiCall(restApiCall)
116 .persisted(persisted) 120 .persisted(persisted)
117 .additionalInfo(additionalInfo) 121 .additionalInfo(additionalInfo)