Commit e5b988a297e018a6c58f5a484fddf65b3525d0e0

Authored by Andrii Shvaika
2 parents 7eb55b90 d49bee4b

Merge branch 'rpc-improvements' of https://github.com/YevhenBondarenko/thingsboa…

…rd into develop/3.3.1
Showing 26 changed files with 340 additions and 119 deletions
... ... @@ -400,6 +400,14 @@ public class ActorSystemContext {
400 400 @Getter
401 401 private String debugPerTenantLimitsConfiguration;
402 402
  403 + @Value("${actors.rpc.sequence.enabled:false}")
  404 + @Getter
  405 + private boolean rpcSequenceEnabled;
  406 +
  407 + @Value("${actors.rpc.max_retries:5}")
  408 + @Getter
  409 + private int maxRpcRetries;
  410 +
403 411 @Getter
404 412 @Setter
405 413 private TbActorSystem actorSystem;
... ...
... ... @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
48 48 import org.thingsboard.server.common.data.kv.KvEntry;
49 49 import org.thingsboard.server.common.data.page.PageData;
50 50 import org.thingsboard.server.common.data.page.PageLink;
  51 +import org.thingsboard.server.common.data.page.SortOrder;
51 52 import org.thingsboard.server.common.data.relation.EntityRelation;
52 53 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
53 54 import org.thingsboard.server.common.data.rpc.Rpc;
... ... @@ -79,9 +80,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
79 80 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
80 81 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
81 82 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
82   -import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;
83 83 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
84 84 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  85 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg;
85 86 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
86 87 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
87 88 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
... ... @@ -98,9 +99,11 @@ import java.util.Arrays;
98 99 import java.util.Collections;
99 100 import java.util.HashMap;
100 101 import java.util.HashSet;
  102 +import java.util.LinkedHashMap;
101 103 import java.util.List;
102 104 import java.util.Map;
103 105 import java.util.Objects;
  106 +import java.util.Optional;
104 107 import java.util.Set;
105 108 import java.util.UUID;
106 109 import java.util.function.Consumer;
... ... @@ -119,6 +122,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
119 122 private final Map<UUID, SessionInfo> attributeSubscriptions;
120 123 private final Map<UUID, SessionInfo> rpcSubscriptions;
121 124 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
  125 + private final boolean rpcSequenceEnabled;
122 126
123 127 private int rpcSeq = 0;
124 128 private String deviceName;
... ... @@ -130,9 +134,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
130 134 super(systemContext);
131 135 this.tenantId = tenantId;
132 136 this.deviceId = deviceId;
  137 + this.rpcSequenceEnabled = systemContext.isRpcSequenceEnabled();
133 138 this.attributeSubscriptions = new HashMap<>();
134 139 this.rpcSubscriptions = new HashMap<>();
135   - this.toDeviceRpcPendingMap = new HashMap<>();
  140 + this.toDeviceRpcPendingMap = new LinkedHashMap<>();
136 141 this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
137 142 if (initAttributes()) {
138 143 restoreSessions();
... ... @@ -183,19 +188,19 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
183 188 if (timeout <= 0) {
184 189 log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
185 190 if (persisted) {
186   - createRpc(request, RpcStatus.TIMEOUT);
  191 + createRpc(request, RpcStatus.EXPIRED);
187 192 }
188 193 return;
189 194 } else if (persisted) {
190 195 createRpc(request, RpcStatus.QUEUED);
191 196 }
192 197
193   - boolean sent;
  198 + boolean sent = false;
194 199 if (systemContext.isEdgesEnabled() && edgeId != null) {
195 200 log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
196 201 saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
197 202 sent = true;
198   - } else {
  203 + } else if (isSendNewRpcAvailable()) {
199 204 sent = rpcSubscriptions.size() > 0;
200 205 Set<UUID> syncSessionSet = new HashSet<>();
201 206 rpcSubscriptions.forEach((key, value) -> {
... ... @@ -227,6 +232,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
227 232 }
228 233 }
229 234
  235 + private boolean isSendNewRpcAvailable() {
  236 + return !rpcSequenceEnabled || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
  237 + }
  238 +
230 239 private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
231 240 Rpc rpc = new Rpc(new RpcId(request.getId()));
232 241 rpc.setCreatedTime(System.currentTimeMillis());
... ... @@ -266,16 +275,26 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
266 275
267 276 void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
268 277 log.debug("[{}] Processing remove rpc command", msg.getRequestId());
269   - Integer requestId = null;
270   - for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {
271   - if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
272   - requestId = entry.getKey();
  278 + Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
  279 + for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
  280 + if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
  281 + entry = e;
273 282 break;
274 283 }
275 284 }
276 285
277   - if (requestId != null) {
278   - toDeviceRpcPendingMap.remove(requestId);
  286 + if (entry != null) {
  287 + if (entry.getValue().isDelivered()) {
  288 + toDeviceRpcPendingMap.remove(entry.getKey());
  289 + } else {
  290 + Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
  291 + if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
  292 + toDeviceRpcPendingMap.remove(entry.getKey());
  293 + sendNextPendingRequest(context);
  294 + } else {
  295 + toDeviceRpcPendingMap.remove(entry.getKey());
  296 + }
  297 + }
279 298 }
280 299 }
281 300
... ... @@ -290,14 +309,17 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
290 309 if (requestMd != null) {
291 310 log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
292 311 if (requestMd.getMsg().getMsg().isPersisted()) {
293   - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.TIMEOUT, null);
  312 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
294 313 }
295 314 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
296 315 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
  316 + if (!requestMd.isDelivered()) {
  317 + sendNextPendingRequest(context);
  318 + }
297 319 }
298 320 }
299 321
300   - private void sendPendingRequests(TbActorCtx context, UUID sessionId, SessionInfoProto sessionInfo) {
  322 + private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
301 323 SessionType sessionType = getSessionType(sessionId);
302 324 if (!toDeviceRpcPendingMap.isEmpty()) {
303 325 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
... ... @@ -309,20 +331,33 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
309 331 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
310 332 }
311 333 Set<Integer> sentOneWayIds = new HashSet<>();
312   - if (sessionType == SessionType.ASYNC) {
313   - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
  334 +
  335 + if (rpcSequenceEnabled) {
  336 + getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  337 + } else if (sessionType == SessionType.ASYNC) {
  338 + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
314 339 } else {
315   - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));
  340 + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
316 341 }
317 342
318 343 sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
319 344 }
320 345
  346 + private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
  347 + return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
  348 + }
  349 +
  350 + private void sendNextPendingRequest(TbActorCtx context) {
  351 + if (rpcSequenceEnabled) {
  352 + rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
  353 + }
  354 + }
  355 +
321 356 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
322 357 return entry -> {
323 358 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
324 359 ToDeviceRpcRequestBody body = request.getBody();
325   - if (request.isOneway()) {
  360 + if (request.isOneway() && !rpcSequenceEnabled) {
326 361 sentOneWayIds.add(entry.getKey());
327 362 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
328 363 }
... ... @@ -355,7 +390,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
355 390 processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
356 391 }
357 392 if (msg.hasSendPendingRPC()) {
358   - sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo);
  393 + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
359 394 }
360 395 if (msg.hasGetAttributes()) {
361 396 handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
... ... @@ -369,8 +404,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
369 404 if (msg.hasClaimDevice()) {
370 405 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
371 406 }
372   - if (msg.hasPersistedRpcResponseMsg()) {
373   - processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg());
  407 + if (msg.hasRpcResponseStatusMsg()) {
  408 + processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
374 409 }
375 410 if (msg.hasUplinkNotificationMsg()) {
376 411 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
... ... @@ -530,38 +565,63 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
530 565 boolean success = requestMd != null;
531 566 if (success) {
532 567 boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
533   - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
534   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
535   - new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
536   - payload, null));
537   - if (requestMd.getMsg().getMsg().isPersisted()) {
538   - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
539   - JsonNode response;
540   - try {
541   - response = JacksonUtil.toJsonNode(payload);
542   - } catch (IllegalArgumentException e) {
543   - response = JacksonUtil.newObjectNode().put("error", payload);
  568 + try {
  569 + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
  570 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
  571 + new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  572 + payload, null));
  573 + if (requestMd.getMsg().getMsg().isPersisted()) {
  574 + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  575 + JsonNode response;
  576 + try {
  577 + response = JacksonUtil.toJsonNode(payload);
  578 + } catch (IllegalArgumentException e) {
  579 + response = JacksonUtil.newObjectNode().put("error", payload);
  580 + }
  581 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
  582 + }
  583 + } finally {
  584 + if (hasError) {
  585 + sendNextPendingRequest(context);
544 586 }
545   - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
546 587 }
547 588 } else {
548 589 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
549 590 }
550 591 }
551 592
552   - private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) {
  593 + private void processRpcResponseStatus(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
553 594 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
554 595 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
555   -
556   - ToDeviceRpcRequestMetadata md;
557   - if (RpcStatus.DELIVERED.equals(status)) {
558   - md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
559   - } else {
560   - md = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
561   - }
  596 + ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
562 597
563 598 if (md != null) {
564   - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null);
  599 + if (status.equals(RpcStatus.DELIVERED)) {
  600 + if (md.getMsg().getMsg().isOneway()) {
  601 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  602 + if (rpcSequenceEnabled) {
  603 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
  604 + }
  605 + } else {
  606 + md.setDelivered(true);
  607 + }
  608 + } else if (status.equals(RpcStatus.TIMEOUT)) {
  609 + Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
  610 + maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
  611 + if (maxRpcRetries <= md.getRetries()) {
  612 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  613 + status = RpcStatus.FAILED;
  614 + } else {
  615 + md.setRetries(md.getRetries() + 1);
  616 + }
  617 + }
  618 +
  619 + if (md.getMsg().getMsg().isPersisted()) {
  620 + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null);
  621 + }
  622 + if (status != RpcStatus.SENT) {
  623 + sendNextPendingRequest(context);
  624 + }
565 625 } else {
566 626 log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
567 627 }
... ... @@ -601,7 +661,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
601 661 sessionMD.setSubscribedToRPC(true);
602 662 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
603 663 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
604   - sendPendingRequests(context, sessionId, sessionInfo);
  664 + sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
605 665 dumpSessions();
606 666 }
607 667 }
... ... @@ -871,7 +931,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
871 931
872 932 void init(TbActorCtx ctx) {
873 933 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
874   - PageLink pageLink = new PageLink(1024);
  934 + PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
875 935 PageData<Rpc> pageData;
876 936 do {
877 937 pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
... ... @@ -879,7 +939,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
879 939 ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
880 940 long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
881 941 if (timeout <= 0) {
882   - rpc.setStatus(RpcStatus.TIMEOUT);
  942 + rpc.setStatus(RpcStatus.EXPIRED);
883 943 systemContext.getTbRpcService().save(tenantId, rpc);
884 944 } else {
885 945 registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
... ...
... ... @@ -25,4 +25,6 @@ import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
25 25 public class ToDeviceRpcRequestMetadata {
26 26 private final ToDeviceRpcRequestActorMsg msg;
27 27 private final boolean sent;
  28 + private int retries;
  29 + private boolean delivered;
28 30 }
... ...
... ... @@ -75,11 +75,12 @@ public abstract class AbstractRpcController extends BaseController {
75 75 SecurityUser currentUser = getCurrentUser();
76 76 TenantId tenantId = currentUser.getTenantId();
77 77 final DeferredResult<ResponseEntity> response = new DeferredResult<>();
78   - long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;
79   - long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout);
  78 + long timeout = rpcRequestBody.has(DataConstants.TIMEOUT) ? rpcRequestBody.get(DataConstants.TIMEOUT).asLong() : defaultTimeout;
  79 + long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout);
80 80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
81 81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
82 82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
  83 + Integer retries = rpcRequestBody.has(DataConstants.RETRIES) ? rpcRequestBody.get(DataConstants.RETRIES).asInt() : null;
83 84 accessValidator.validate(currentUser, Operation.RPC_CALL, deviceId, new HttpValidationCallback(response, new FutureCallback<>() {
84 85 @Override
85 86 public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
... ... @@ -90,6 +91,7 @@ public abstract class AbstractRpcController extends BaseController {
90 91 expTime,
91 92 body,
92 93 persisted,
  94 + retries,
93 95 additionalInfo
94 96 );
95 97 deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser);
... ...
... ... @@ -166,6 +166,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
166 166 metaData.putValue("oneway", Boolean.toString(msg.isOneway()));
167 167 metaData.putValue(DataConstants.PERSISTENT, Boolean.toString(msg.isPersisted()));
168 168
  169 + if (msg.getRetries() != null) {
  170 + metaData.putValue(DataConstants.RETRIES, msg.getRetries().toString());
  171 + }
  172 +
  173 +
169 174 Device device = deviceService.findDeviceById(msg.getTenantId(), msg.getDeviceId());
170 175 if (device != null) {
171 176 metaData.putValue("deviceName", device.getName());
... ...
... ... @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
101 101 @Override
102 102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
103 103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
104   - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo());
  104 + src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo());
105 105 forwardRpcRequestToDeviceActor(request, response -> {
106 106 if (src.isRestApiCall()) {
107 107 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
... ...
... ... @@ -326,6 +326,10 @@ actors:
326 326 queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
327 327 # Time in milliseconds for transaction to complete
328 328 duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
  329 + rpc:
  330 + max_retries: "${ACTORS_RPC_MAX_RETRIES:5}"
  331 + sequence:
  332 + enabled: "${ACTORS_RPC_SEQUENCE_ENABLED:false}"
329 333 statistics:
330 334 # Enable/disable actor statistics
331 335 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
... ...
... ... @@ -90,6 +90,11 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab
90 90 }
91 91
92 92 @Test
  93 + public void testSequenceServerMqttTwoWayRpc() throws Exception {
  94 + processSequenceTwoWayRpcTest();
  95 + }
  96 +
  97 + @Test
93 98 public void testGatewayServerMqttOneWayRpc() throws Exception {
94 99 processOneWayRpcTestGateway("Gateway Device OneWay RPC");
95 100 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.transport.mqtt.rpc;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
19 20 import com.google.protobuf.InvalidProtocolBufferException;
20 21 import com.nimbusds.jose.util.StandardCharset;
21 22 import io.netty.handler.codec.mqtt.MqttQoS;
... ... @@ -27,13 +28,16 @@ import org.eclipse.paho.client.mqttv3.MqttCallback;
27 28 import org.eclipse.paho.client.mqttv3.MqttException;
28 29 import org.eclipse.paho.client.mqttv3.MqttMessage;
29 30 import org.junit.Assert;
  31 +import org.thingsboard.common.util.JacksonUtil;
30 32 import org.thingsboard.server.common.data.Device;
31 33 import org.thingsboard.server.common.data.TransportPayloadType;
32 34 import org.thingsboard.server.common.data.device.profile.MqttTopics;
33   -import org.thingsboard.common.util.JacksonUtil;
34 35 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
35 36
  37 +import java.util.ArrayList;
36 38 import java.util.Arrays;
  39 +import java.util.List;
  40 +import java.util.concurrent.CopyOnWriteArrayList;
37 41 import java.util.concurrent.CountDownLatch;
38 42 import java.util.concurrent.TimeUnit;
39 43
... ... @@ -101,6 +105,32 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
101 105 Assert.assertEquals(expected, result);
102 106 }
103 107
  108 + protected void processSequenceTwoWayRpcTest() throws Exception {
  109 + List<String> expected = new ArrayList<>();
  110 + List<String> result = new ArrayList<>();
  111 +
  112 + String deviceId = savedDevice.getId().getId().toString();
  113 +
  114 + for (int i = 0; i < 10; i++) {
  115 + ObjectNode request = JacksonUtil.newObjectNode();
  116 + request.put("method", "test");
  117 + request.put("params", i);
  118 + expected.add(JacksonUtil.toString(request));
  119 + request.put("persistent", true);
  120 + doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
  121 + }
  122 +
  123 + MqttAsyncClient client = getMqttAsyncClient(accessToken);
  124 + client.setManualAcks(true);
  125 + CountDownLatch latch = new CountDownLatch(10);
  126 + TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result);
  127 + client.setCallback(callback);
  128 + client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
  129 +
  130 + latch.await(10, TimeUnit.SECONDS);
  131 + Assert.assertEquals(expected, result);
  132 + }
  133 +
104 134 protected void processTwoWayRpcTestGateway(String deviceName) throws Exception {
105 135 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
106 136
... ... @@ -213,4 +243,38 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
213 243
214 244 }
215 245 }
  246 +
  247 + protected class TestSequenceMqttCallback implements MqttCallback {
  248 +
  249 + private final MqttAsyncClient client;
  250 + private final CountDownLatch latch;
  251 + private final List<String> expected;
  252 +
  253 + TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) {
  254 + this.client = client;
  255 + this.latch = latch;
  256 + this.expected = expected;
  257 + }
  258 +
  259 + @Override
  260 + public void connectionLost(Throwable throwable) {
  261 + }
  262 +
  263 + @Override
  264 + public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
  265 + log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
  266 + expected.add(new String(mqttMessage.getPayload()));
  267 + String responseTopic = requestTopic.replace("request", "response");
  268 + var qoS = mqttMessage.getQos();
  269 +
  270 + client.messageArrivedComplete(mqttMessage.getId(), qoS);
  271 + client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
  272 + latch.countDown();
  273 + }
  274 +
  275 + @Override
  276 + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  277 +
  278 + }
  279 + }
216 280 }
... ...
... ... @@ -346,7 +346,7 @@ message UplinkNotificationMsg {
346 346 int64 uplinkTs = 1;
347 347 }
348 348
349   -message ToDevicePersistedRpcResponseMsg {
  349 +message ToDeviceRpcResponseStatusMsg {
350 350 int32 requestId = 1;
351 351 int64 requestIdMSB = 2;
352 352 int64 requestIdLSB = 3;
... ... @@ -456,7 +456,7 @@ message TransportToDeviceActorMsg {
456 456 SubscriptionInfoProto subscriptionInfo = 7;
457 457 ClaimDeviceMsg claimDevice = 8;
458 458 ProvisionDeviceRequestMsg provisionDevice = 9;
459   - ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10;
  459 + ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10;
460 460 SendPendingRPCMsg sendPendingRPC = 11;
461 461 UplinkNotificationMsg uplinkNotificationMsg = 12;
462 462 }
... ...
... ... @@ -36,7 +36,10 @@ public class DataConstants {
36 36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
37 37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
38 38 public static final String PERSISTENT = "persistent";
  39 + public static final String TIMEOUT = "timeout";
  40 + public static final String EXPIRATION_TIME = "expirationTime";
39 41 public static final String ADDITIONAL_INFO = "additionalInfo";
  42 + public static final String RETRIES = "retries";
40 43 public static final String COAP_TRANSPORT_NAME = "COAP";
41 44 public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
42 45 public static final String MQTT_TRANSPORT_NAME = "MQTT";
... ... @@ -85,9 +88,11 @@ public class DataConstants {
85 88 public static final String RPC_CALL_FROM_SERVER_TO_DEVICE = "RPC_CALL_FROM_SERVER_TO_DEVICE";
86 89
87 90 public static final String RPC_QUEUED = "RPC_QUEUED";
  91 + public static final String RPC_SENT = "RPC_SENT";
88 92 public static final String RPC_DELIVERED = "RPC_DELIVERED";
89 93 public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL";
90 94 public static final String RPC_TIMEOUT = "RPC_TIMEOUT";
  95 + public static final String RPC_EXPIRED = "RPC_EXPIRED";
91 96 public static final String RPC_FAILED = "RPC_FAILED";
92 97 public static final String RPC_DELETED = "RPC_DELETED";
93 98
... ...
... ... @@ -16,5 +16,5 @@
16 16 package org.thingsboard.server.common.data.rpc;
17 17
18 18 public enum RpcStatus {
19   - QUEUED, DELIVERED, SUCCESSFUL, TIMEOUT, FAILED
  19 + QUEUED, SENT, DELIVERED, SUCCESSFUL, TIMEOUT, EXPIRED, FAILED
20 20 }
... ...
... ... @@ -36,6 +36,7 @@ public class ToDeviceRpcRequest implements Serializable {
36 36 private final long expirationTime;
37 37 private final ToDeviceRpcRequestBody body;
38 38 private final boolean persisted;
  39 + private final Integer retries;
39 40 @JsonIgnore
40 41 private final String additionalInfo;
41 42 }
... ...
... ... @@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
42 42 import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
43 43 import org.thingsboard.server.common.data.id.DeviceId;
44 44 import org.thingsboard.server.common.data.id.DeviceProfileId;
  45 +import org.thingsboard.server.common.data.rpc.RpcStatus;
45 46 import org.thingsboard.server.common.msg.session.FeatureType;
46 47 import org.thingsboard.server.common.msg.session.SessionMsgType;
47 48 import org.thingsboard.server.common.transport.SessionMsgListener;
... ... @@ -192,29 +193,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
192 193 client.lock();
193 194 try {
194 195 long uplinkTime = client.updateLastUplinkTime(uplinkTs);
195   - long timeout;
196   - if (PowerMode.PSM.equals(powerMode)) {
197   - Long psmActivityTimer = client.getPsmActivityTimer();
198   - if (psmActivityTimer == null && profileSettings != null) {
199   - psmActivityTimer = profileSettings.getPsmActivityTimer();
200   -
201   - }
202   - if (psmActivityTimer == null || psmActivityTimer == 0L) {
203   - psmActivityTimer = config.getPsmActivityTimer();
204   - }
205   -
206   - timeout = psmActivityTimer;
207   - } else {
208   - Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
209   - if (pagingTransmissionWindow == null && profileSettings != null) {
210   - pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
211   -
212   - }
213   - if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
214   - pagingTransmissionWindow = config.getPagingTransmissionWindow();
215   - }
216   - timeout = pagingTransmissionWindow;
217   - }
  196 + long timeout = getTimeout(client, powerMode, profileSettings);
218 197 Future<Void> sleepTask = client.getSleepTask();
219 198 if (sleepTask != null) {
220 199 sleepTask.cancel(false);
... ... @@ -234,6 +213,33 @@ public class DefaultCoapClientContext implements CoapClientContext {
234 213 }
235 214 }
236 215
  216 + private long getTimeout(TbCoapClientState client, PowerMode powerMode, PowerSavingConfiguration profileSettings) {
  217 + long timeout;
  218 + if (PowerMode.PSM.equals(powerMode)) {
  219 + Long psmActivityTimer = client.getPsmActivityTimer();
  220 + if (psmActivityTimer == null && profileSettings != null) {
  221 + psmActivityTimer = profileSettings.getPsmActivityTimer();
  222 +
  223 + }
  224 + if (psmActivityTimer == null || psmActivityTimer == 0L) {
  225 + psmActivityTimer = config.getPsmActivityTimer();
  226 + }
  227 +
  228 + timeout = psmActivityTimer;
  229 + } else {
  230 + Long pagingTransmissionWindow = client.getPagingTransmissionWindow();
  231 + if (pagingTransmissionWindow == null && profileSettings != null) {
  232 + pagingTransmissionWindow = profileSettings.getPagingTransmissionWindow();
  233 +
  234 + }
  235 + if (pagingTransmissionWindow == null || pagingTransmissionWindow == 0L) {
  236 + pagingTransmissionWindow = config.getPagingTransmissionWindow();
  237 + }
  238 + timeout = pagingTransmissionWindow;
  239 + }
  240 + return timeout;
  241 + }
  242 +
237 243 private boolean registerFeatureObservation(TbCoapClientState state, String token, CoapExchange exchange, FeatureType featureType) {
238 244 state.lock();
239 245 try {
... ... @@ -524,17 +530,38 @@ public class DefaultCoapClientContext implements CoapClientContext {
524 530 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
525 531 int requestId = getNextMsgId();
526 532 response.setMID(requestId);
527   - if (msg.getPersisted() && conRequest) {
  533 + if (conRequest) {
  534 + PowerMode powerMode = state.getPowerMode();
  535 + PowerSavingConfiguration profileSettings = null;
  536 + if (powerMode == null) {
  537 + var clientProfile = getProfile(state.getProfileId());
  538 + if (clientProfile.isPresent()) {
  539 + profileSettings = clientProfile.get().getClientSettings();
  540 + if (profileSettings != null) {
  541 + powerMode = profileSettings.getPowerMode();
  542 + }
  543 + }
  544 + }
  545 +
528 546 transportContext.getRpcAwaitingAck().put(requestId, msg);
529 547 transportContext.getScheduler().schedule(() -> {
530   - transportContext.getRpcAwaitingAck().remove(requestId);
531   - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  548 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId);
  549 + if (rpcRequestMsg != null) {
  550 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  551 + }
  552 + }, Math.min(getTimeout(state, powerMode, profileSettings), msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  553 +
532 554 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
533 555 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
534 556 if (rpcRequestMsg != null) {
535   - transportService.process(state.getSession(), rpcRequestMsg, TransportServiceCallback.EMPTY);
  557 + transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
536 558 }
537   - }, null));
  559 + }, id -> {
  560 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
  561 + if (rpcRequestMsg != null) {
  562 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  563 + }
  564 + }));
538 565 }
539 566 if (conRequest) {
540 567 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
... ... @@ -553,8 +580,12 @@ public class DefaultCoapClientContext implements CoapClientContext {
553 580 transportService.process(state.getSession(),
554 581 TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
555 582 .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
556   - } else if (msg.getPersisted() && !conRequest && sent) {
557   - transportService.process(state.getSession(), msg, TransportServiceCallback.EMPTY);
  583 + } else if (sent) {
  584 + if (!conRequest) {
  585 + transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  586 + } else if (msg.getPersisted()) {
  587 + transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  588 + }
558 589 }
559 590 }
560 591 }
... ... @@ -723,7 +754,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
723 754 private void cancelRpcSubscription(TbCoapClientState state) {
724 755 if (state.getRpc() != null) {
725 756 clientsByToken.remove(state.getRpc().getToken());
726   - CoapExchange exchange = state.getAttrs().getExchange();
  757 + CoapExchange exchange = state.getRpc().getExchange();
727 758 state.setRpc(null);
728 759 transportService.process(state.getSession(),
729 760 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
... ...
... ... @@ -407,7 +407,7 @@ public class DeviceApiController implements TbTransportService {
407 407 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg msg) {
408 408 log.trace("[{}] Received RPC command to device", sessionId);
409 409 responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg, true).toString(), HttpStatus.OK));
410   - transportService.process(sessionInfo, msg, TransportServiceCallback.EMPTY);
  410 + transportService.process(sessionInfo, msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
411 411 }
412 412
413 413 @Override
... ...
... ... @@ -21,7 +21,9 @@ import org.eclipse.leshan.core.ResponseCode;
21 21 import org.springframework.stereotype.Service;
22 22 import org.thingsboard.common.util.JacksonUtil;
23 23 import org.thingsboard.server.common.data.StringUtils;
  24 +import org.thingsboard.server.common.data.rpc.RpcStatus;
24 25 import org.thingsboard.server.common.transport.TransportService;
  26 +import org.thingsboard.server.common.transport.TransportServiceCallback;
25 27 import org.thingsboard.server.gen.transport.TransportProtos;
26 28 import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
27 29 import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
... ... @@ -158,6 +160,7 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler {
158 160 throw new IllegalArgumentException("Unsupported operation: " + operationType.name());
159 161 }
160 162 }
  163 + transportService.process(client.getSession(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
161 164 } catch (IllegalArgumentException e) {
162 165 this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage());
163 166 }
... ...
... ... @@ -19,6 +19,7 @@ import org.eclipse.leshan.core.ResponseCode;
19 19 import org.eclipse.leshan.core.request.exception.ClientSleepingException;
20 20 import org.thingsboard.common.util.JacksonUtil;
21 21 import org.thingsboard.server.common.data.StringUtils;
  22 +import org.thingsboard.server.common.data.rpc.RpcStatus;
22 23 import org.thingsboard.server.common.transport.TransportService;
23 24 import org.thingsboard.server.common.transport.TransportServiceCallback;
24 25 import org.thingsboard.server.gen.transport.TransportProtos;
... ... @@ -44,7 +45,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
44 45
45 46 @Override
46 47 public void onSuccess(R request, T response) {
47   - transportService.process(client.getSession(), this.request, TransportServiceCallback.EMPTY);
  48 + transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
48 49 sendRpcReplyOnSuccess(response);
49 50 if (callback != null) {
50 51 callback.onSuccess(request, response);
... ... @@ -61,7 +62,9 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
61 62
62 63 @Override
63 64 public void onError(String params, Exception e) {
64   - if (!(e instanceof TimeoutException || e instanceof ClientSleepingException)) {
  65 + if (e instanceof TimeoutException) {
  66 + transportService.process(client.getSession(), this.request, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  67 + } else if (!(e instanceof ClientSleepingException)) {
65 68 sendRpcReplyOnError(e);
66 69 }
67 70 if (callback != null) {
... ...
... ... @@ -68,4 +68,7 @@ public class MqttTransportContext extends TransportContext {
68 68 @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
69 69 private int messageQueueSizePerDeviceLimit;
70 70
  71 + @Getter
  72 + @Value("${transport.mqtt.timeout:10000}")
  73 + private long timeout;
71 74 }
... ...
... ... @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
50 50 import org.thingsboard.server.common.data.device.profile.MqttTopics;
51 51 import org.thingsboard.server.common.data.id.OtaPackageId;
52 52 import org.thingsboard.server.common.data.ota.OtaPackageType;
  53 +import org.thingsboard.server.common.data.rpc.RpcStatus;
53 54 import org.thingsboard.server.common.msg.EncryptionUtil;
54 55 import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
55 56 import org.thingsboard.server.common.transport.SessionMsgListener;
... ... @@ -272,7 +273,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
272 273 int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
273 274 TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
274 275 if (rpcRequest != null) {
275   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
  276 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
276 277 }
277 278 break;
278 279 default:
... ... @@ -849,20 +850,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
849 850 try {
850 851 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
851 852 int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
852   - if (rpcRequest.getPersisted() && isAckExpected(payload)) {
  853 + if (isAckExpected(payload)) {
853 854 rpcAwaitingAck.put(msgId, rpcRequest);
854 855 context.getScheduler().schedule(() -> {
855   - rpcAwaitingAck.remove(msgId);
856   - }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
  856 + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
  857 + if (msg != null) {
  858 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  859 + }
  860 + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
857 861 }
858 862 var cf = publish(payload, deviceSessionCtx);
859   - if (rpcRequest.getPersisted() && !isAckExpected(payload)) {
860   - cf.addListener(result -> {
861   - if (result.cause() == null) {
862   - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, TransportServiceCallback.EMPTY);
  863 + cf.addListener(result -> {
  864 + if (result.cause() == null) {
  865 + if (!isAckExpected(payload)) {
  866 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  867 + } else if (rpcRequest.getPersisted()) {
  868 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
863 869 }
864   - });
865   - }
  870 + } else {
  871 + // TODO: send error
  872 + }
  873 + });
866 874 });
867 875 } catch (Exception e) {
868 876 transportService.process(deviceSessionCtx.getSessionInfo(),
... ...
... ... @@ -16,8 +16,10 @@
16 16 package org.thingsboard.server.transport.mqtt.session;
17 17
18 18 import io.netty.channel.ChannelFuture;
  19 +import io.netty.handler.codec.mqtt.MqttMessage;
19 20 import lombok.extern.slf4j.Slf4j;
20 21 import org.thingsboard.server.common.data.DeviceProfile;
  22 +import org.thingsboard.server.common.data.rpc.RpcStatus;
21 23 import org.thingsboard.server.common.transport.SessionMsgListener;
22 24 import org.thingsboard.server.common.transport.TransportService;
23 25 import org.thingsboard.server.common.transport.TransportServiceCallback;
... ... @@ -102,9 +104,14 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
102 104 payload -> {
103 105 ChannelFuture channelFuture = parent.writeAndFlush(payload);
104 106 if (request.getPersisted()) {
105   - channelFuture.addListener(future -> {
106   - if (future.cause() == null) {
107   - transportService.process(getSessionInfo(), request, TransportServiceCallback.EMPTY);
  107 + channelFuture.addListener(result -> {
  108 + if (result.cause() == null) {
  109 + if (!isAckExpected(payload)) {
  110 + transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  111 + } else if (request.getPersisted()) {
  112 + transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
  113 +
  114 + }
108 115 }
109 116 });
110 117 }
... ... @@ -129,4 +136,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
129 136 // This feature is not supported in the TB IoT Gateway yet.
130 137 }
131 138
  139 + private boolean isAckExpected(MqttMessage message) {
  140 + return message.fixedHeader().qosLevel().value() > 0;
  141 + }
  142 +
132 143 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
26 26 import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
27 27 import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
28 28 import org.thingsboard.server.common.data.id.DeviceId;
  29 +import org.thingsboard.server.common.data.rpc.RpcStatus;
29 30 import org.thingsboard.server.common.transport.SessionMsgListener;
30 31 import org.thingsboard.server.common.transport.TransportServiceCallback;
31 32 import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
... ... @@ -142,7 +143,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
142 143 public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
143 144 log.trace("[{}] Received RPC command to device", sessionId);
144 145 snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
145   - snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, TransportServiceCallback.EMPTY);
  146 + snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
146 147 }
147 148
148 149 @Override
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport;
17 17
18 18 import org.thingsboard.server.common.data.DeviceProfile;
19 19 import org.thingsboard.server.common.data.DeviceTransportType;
  20 +import org.thingsboard.server.common.data.rpc.RpcStatus;
20 21 import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
21 22 import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
22 23 import org.thingsboard.server.common.transport.service.SessionMetaData;
... ... @@ -112,7 +113,7 @@ public interface TransportService {
112 113
113 114 void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
114 115
115   - void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback);
  116 + void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback);
116 117
117 118 void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
118 119
... ...
... ... @@ -589,22 +589,18 @@ public class DefaultTransportService implements TransportService {
589 589 }
590 590
591 591 @Override
592   - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
593   - if (msg.getPersisted()) {
594   - RpcStatus status = msg.getOneway() ? RpcStatus.SUCCESSFUL : RpcStatus.DELIVERED;
595   -
596   - TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()
597   - .setRequestId(msg.getRequestId())
598   - .setRequestIdLSB(msg.getRequestIdLSB())
599   - .setRequestIdMSB(msg.getRequestIdMSB())
600   - .setStatus(status.name())
601   - .build();
602   -
603   - if (checkLimits(sessionInfo, responseMsg, callback)) {
604   - reportActivityInternal(sessionInfo);
605   - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(responseMsg).build(),
606   - new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
607   - }
  592 + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
  593 + TransportProtos.ToDeviceRpcResponseStatusMsg responseMsg = TransportProtos.ToDeviceRpcResponseStatusMsg.newBuilder()
  594 + .setRequestId(msg.getRequestId())
  595 + .setRequestIdLSB(msg.getRequestIdLSB())
  596 + .setRequestIdMSB(msg.getRequestIdMSB())
  597 + .setStatus(rpcStatus.name())
  598 + .build();
  599 +
  600 + if (checkLimits(sessionInfo, responseMsg, callback)) {
  601 + reportActivityInternal(sessionInfo);
  602 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setRpcResponseStatusMsg(responseMsg).build(),
  603 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
608 604 }
609 605 }
610 606
... ...
... ... @@ -41,5 +41,5 @@ public final class RuleEngineDeviceRpcRequest {
41 41 private final long expirationTime;
42 42 private final boolean restApiCall;
43 43 private final String additionalInfo;
44   -
  44 + private final Integer retries;
45 45 }
... ...
... ... @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
33 33 type = ComponentType.FILTER,
34 34 name = "message type switch",
35 35 configClazz = EmptyNodeConfiguration.class,
36   - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted",
  36 + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted",
37 37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
38 38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
39 39 "Timeseries Updated", "Timeseries Deleted"},
... ... @@ -97,12 +97,16 @@ public class TbMsgTypeSwitchNode implements TbNode {
97 97 relationType = "Timeseries Deleted";
98 98 } else if (msg.getType().equals(DataConstants.RPC_QUEUED)) {
99 99 relationType = "RPC Queued";
  100 + } else if (msg.getType().equals(DataConstants.RPC_SENT)) {
  101 + relationType = "RPC Sent";
100 102 } else if (msg.getType().equals(DataConstants.RPC_DELIVERED)) {
101 103 relationType = "RPC Delivered";
102 104 } else if (msg.getType().equals(DataConstants.RPC_SUCCESSFUL)) {
103 105 relationType = "RPC Successful";
104 106 } else if (msg.getType().equals(DataConstants.RPC_TIMEOUT)) {
105 107 relationType = "RPC Timeout";
  108 + } else if (msg.getType().equals(DataConstants.RPC_EXPIRED)) {
  109 + relationType = "RPC Expired";
106 110 } else if (msg.getType().equals(DataConstants.RPC_FAILED)) {
107 111 relationType = "RPC Failed";
108 112 } else if (msg.getType().equals(DataConstants.RPC_DELETED)) {
... ...
... ... @@ -89,9 +89,12 @@ public class TbSendRPCRequestNode implements TbNode {
89 89 tmp = msg.getMetaData().getValue("originServiceId");
90 90 String originServiceId = !StringUtils.isEmpty(tmp) ? tmp : null;
91 91
92   - tmp = msg.getMetaData().getValue("expirationTime");
  92 + tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME);
93 93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
94 94
  95 + tmp = msg.getMetaData().getValue(DataConstants.RETRIES);
  96 + Integer retries = !StringUtils.isEmpty(tmp) ? Integer.parseInt(tmp) : null;
  97 +
95 98 String params;
96 99 JsonElement paramsEl = json.get("params");
97 100 if (paramsEl.isJsonPrimitive()) {
... ... @@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode {
112 115 .requestUUID(requestUUID)
113 116 .originServiceId(originServiceId)
114 117 .expirationTime(expirationTime)
  118 + .retries(retries)
115 119 .restApiCall(restApiCall)
116 120 .persisted(persisted)
117 121 .additionalInfo(additionalInfo)
... ...