Commit 8513c999030ec6c7c960dd1f57552df343ca9bf3

Authored by YevhenBondarenko
1 parent 8869dc0c

added sequence for the all RPC

@@ -80,9 +80,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; @@ -80,9 +80,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
80 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; 80 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
81 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; 81 import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
82 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; 82 import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
83 -import org.thingsboard.server.gen.transport.TransportProtos.ToDevicePersistedRpcResponseMsg;  
84 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg; 83 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
85 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; 84 import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
  85 +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg;
86 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; 86 import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
87 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 87 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
88 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto; 88 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
@@ -298,7 +298,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -298,7 +298,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
298 } 298 }
299 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 299 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
300 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 300 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
301 - sendNextPendingRequest(context); 301 + if (!requestMd.isDelivered()) {
  302 + sendNextPendingRequest(context);
  303 + }
302 } 304 }
303 } 305 }
304 306
@@ -315,22 +317,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -315,22 +317,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
315 } 317 }
316 Set<Integer> sentOneWayIds = new HashSet<>(); 318 Set<Integer> sentOneWayIds = new HashSet<>();
317 319
318 - if (sessionType == SessionType.ASYNC) {  
319 - if (rpcSequenceEnabled) {  
320 - List<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> entries = new ArrayList<>();  
321 - for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry : toDeviceRpcPendingMap.entrySet()) {  
322 - if (entry.getValue().isDelivered()) {  
323 - continue;  
324 - }  
325 - entries.add(entry);  
326 - if (entry.getValue().getMsg().getMsg().isPersisted() || entry.getValue().getMsg().getMsg().isOneway()) {  
327 - break;  
328 - }  
329 - }  
330 - entries.forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));  
331 - } else {  
332 - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));  
333 - } 320 + if (rpcSequenceEnabled) {
  321 + toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  322 + } else if (sessionType == SessionType.ASYNC) {
  323 + toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
334 } else { 324 } else {
335 toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds)); 325 toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
336 } 326 }
@@ -348,7 +338,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -348,7 +338,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
348 return entry -> { 338 return entry -> {
349 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); 339 ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
350 ToDeviceRpcRequestBody body = request.getBody(); 340 ToDeviceRpcRequestBody body = request.getBody();
351 - if (request.isOneway()) { 341 + if (request.isOneway() && !rpcSequenceEnabled) {
352 sentOneWayIds.add(entry.getKey()); 342 sentOneWayIds.add(entry.getKey());
353 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null)); 343 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
354 } 344 }
@@ -357,6 +347,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -357,6 +347,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
357 .setMethodName(body.getMethod()) 347 .setMethodName(body.getMethod())
358 .setParams(body.getParams()) 348 .setParams(body.getParams())
359 .setExpirationTime(request.getExpirationTime()) 349 .setExpirationTime(request.getExpirationTime())
  350 + .setTimeout(request.getTimeout())
360 .setRequestIdMSB(request.getId().getMostSignificantBits()) 351 .setRequestIdMSB(request.getId().getMostSignificantBits())
361 .setRequestIdLSB(request.getId().getLeastSignificantBits()) 352 .setRequestIdLSB(request.getId().getLeastSignificantBits())
362 .setOneway(request.isOneway()) 353 .setOneway(request.isOneway())
@@ -395,8 +386,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -395,8 +386,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
395 if (msg.hasClaimDevice()) { 386 if (msg.hasClaimDevice()) {
396 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice()); 387 handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
397 } 388 }
398 - if (msg.hasPersistedRpcResponseMsg()) {  
399 - processPersistedRpcResponses(context, sessionInfo, msg.getPersistedRpcResponseMsg()); 389 + if (msg.hasRpcResponseStatusMsg()) {
  390 + processPersistedRpcResponses(context, sessionInfo, msg.getRpcResponseStatusMsg());
400 } 391 }
401 if (msg.hasUplinkNotificationMsg()) { 392 if (msg.hasUplinkNotificationMsg()) {
402 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg()); 393 processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
@@ -556,27 +547,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -556,27 +547,32 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
556 boolean success = requestMd != null; 547 boolean success = requestMd != null;
557 if (success) { 548 if (success) {
558 boolean hasError = StringUtils.isNotEmpty(responseMsg.getError()); 549 boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
559 - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();  
560 - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(  
561 - new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),  
562 - payload, null));  
563 - if (requestMd.getMsg().getMsg().isPersisted()) {  
564 - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;  
565 - JsonNode response;  
566 - try {  
567 - response = JacksonUtil.toJsonNode(payload);  
568 - } catch (IllegalArgumentException e) {  
569 - response = JacksonUtil.newObjectNode().put("error", payload); 550 + try {
  551 + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
  552 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
  553 + new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
  554 + payload, null));
  555 + if (requestMd.getMsg().getMsg().isPersisted()) {
  556 + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  557 + JsonNode response;
  558 + try {
  559 + response = JacksonUtil.toJsonNode(payload);
  560 + } catch (IllegalArgumentException e) {
  561 + response = JacksonUtil.newObjectNode().put("error", payload);
  562 + }
  563 + systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
  564 + }
  565 + } finally {
  566 + if (!requestMd.isDelivered() && hasError) {
  567 + sendNextPendingRequest(context);
570 } 568 }
571 - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);  
572 } 569 }
573 - sendNextPendingRequest(context);  
574 } else { 570 } else {
575 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 571 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
576 } 572 }
577 } 573 }
578 574
579 - private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDevicePersistedRpcResponseMsg responseMsg) { 575 + private void processPersistedRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
580 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB()); 576 UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
581 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus()); 577 RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
582 ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId()); 578 ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
@@ -585,6 +581,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -585,6 +581,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
585 if (status.equals(RpcStatus.DELIVERED)) { 581 if (status.equals(RpcStatus.DELIVERED)) {
586 if (md.getMsg().getMsg().isOneway()) { 582 if (md.getMsg().getMsg().isOneway()) {
587 toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); 583 toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  584 + if (rpcSequenceEnabled) {
  585 + systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
  586 + }
588 } else { 587 } else {
589 md.setDelivered(true); 588 md.setDelivered(true);
590 } 589 }
@@ -597,7 +596,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -597,7 +596,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
597 } 596 }
598 } 597 }
599 598
600 - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null); 599 + if (md.getMsg().getMsg().isPersisted()) {
  600 + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, null);
  601 + }
601 if (status != RpcStatus.SENT) { 602 if (status != RpcStatus.SENT) {
602 sendNextPendingRequest(context); 603 sendNextPendingRequest(context);
603 } 604 }
@@ -75,8 +75,8 @@ public abstract class AbstractRpcController extends BaseController { @@ -75,8 +75,8 @@ public abstract class AbstractRpcController extends BaseController {
75 SecurityUser currentUser = getCurrentUser(); 75 SecurityUser currentUser = getCurrentUser();
76 TenantId tenantId = currentUser.getTenantId(); 76 TenantId tenantId = currentUser.getTenantId();
77 final DeferredResult<ResponseEntity> response = new DeferredResult<>(); 77 final DeferredResult<ResponseEntity> response = new DeferredResult<>();
78 - long timeout = rpcRequestBody.has("timeout") ? rpcRequestBody.get("timeout").asLong() : defaultTimeout;  
79 - long expTime = System.currentTimeMillis() + Math.max(minTimeout, timeout); 78 + long timeout = rpcRequestBody.has(DataConstants.TIMEOUT) ? rpcRequestBody.get(DataConstants.TIMEOUT).asLong() : defaultTimeout;
  79 + long expTime = rpcRequestBody.has(DataConstants.EXPIRATION_TIME) ? rpcRequestBody.get(DataConstants.EXPIRATION_TIME).asLong() : System.currentTimeMillis() + Math.max(minTimeout, timeout);
80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID(); 80 UUID rpcRequestUUID = rpcRequestBody.has("requestUUID") ? UUID.fromString(rpcRequestBody.get("requestUUID").asText()) : UUID.randomUUID();
81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean(); 81 boolean persisted = rpcRequestBody.has(DataConstants.PERSISTENT) && rpcRequestBody.get(DataConstants.PERSISTENT).asBoolean();
82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO)); 82 String additionalInfo = JacksonUtil.toString(rpcRequestBody.get(DataConstants.ADDITIONAL_INFO));
@@ -88,6 +88,7 @@ public abstract class AbstractRpcController extends BaseController { @@ -88,6 +88,7 @@ public abstract class AbstractRpcController extends BaseController {
88 deviceId, 88 deviceId,
89 oneWay, 89 oneWay,
90 expTime, 90 expTime,
  91 + timeout,
91 body, 92 body,
92 persisted, 93 persisted,
93 additionalInfo 94 additionalInfo
@@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @@ -101,7 +101,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
101 @Override 101 @Override
102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) { 102 public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), 103 ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(),
104 - src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo()); 104 + src.isOneway(), src.getExpirationTime(), src.getTimeout(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getAdditionalInfo());
105 forwardRpcRequestToDeviceActor(request, response -> { 105 forwardRpcRequestToDeviceActor(request, response -> {
106 if (src.isRestApiCall()) { 106 if (src.isRestApiCall()) {
107 sendRpcResponseToTbCore(src.getOriginServiceId(), response); 107 sendRpcResponseToTbCore(src.getOriginServiceId(), response);
@@ -28,15 +28,16 @@ import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -28,15 +28,16 @@ import org.eclipse.paho.client.mqttv3.MqttCallback;
28 import org.eclipse.paho.client.mqttv3.MqttException; 28 import org.eclipse.paho.client.mqttv3.MqttException;
29 import org.eclipse.paho.client.mqttv3.MqttMessage; 29 import org.eclipse.paho.client.mqttv3.MqttMessage;
30 import org.junit.Assert; 30 import org.junit.Assert;
  31 +import org.thingsboard.common.util.JacksonUtil;
31 import org.thingsboard.server.common.data.Device; 32 import org.thingsboard.server.common.data.Device;
32 import org.thingsboard.server.common.data.TransportPayloadType; 33 import org.thingsboard.server.common.data.TransportPayloadType;
33 import org.thingsboard.server.common.data.device.profile.MqttTopics; 34 import org.thingsboard.server.common.data.device.profile.MqttTopics;
34 -import org.thingsboard.common.util.JacksonUtil;  
35 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; 35 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
36 36
37 import java.util.ArrayList; 37 import java.util.ArrayList;
38 import java.util.Arrays; 38 import java.util.Arrays;
39 import java.util.List; 39 import java.util.List;
  40 +import java.util.concurrent.CopyOnWriteArrayList;
40 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.TimeUnit;
42 43
@@ -120,12 +121,13 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -120,12 +121,13 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
120 } 121 }
121 122
122 MqttAsyncClient client = getMqttAsyncClient(accessToken); 123 MqttAsyncClient client = getMqttAsyncClient(accessToken);
  124 + client.setManualAcks(true);
123 CountDownLatch latch = new CountDownLatch(10); 125 CountDownLatch latch = new CountDownLatch(10);
124 TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result); 126 TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result);
125 client.setCallback(callback); 127 client.setCallback(callback);
126 client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1); 128 client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
127 129
128 - latch.await(30, TimeUnit.SECONDS); 130 + latch.await(10, TimeUnit.SECONDS);
129 Assert.assertEquals(expected, result); 131 Assert.assertEquals(expected, result);
130 } 132 }
131 133
@@ -246,8 +248,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -246,8 +248,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
246 248
247 private final MqttAsyncClient client; 249 private final MqttAsyncClient client;
248 private final CountDownLatch latch; 250 private final CountDownLatch latch;
249 - private final List<String> expected;  
250 - private Integer qoS; 251 + private final List<String> expected;
251 252
252 TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) { 253 TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) {
253 this.client = client; 254 this.client = client;
@@ -255,10 +256,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -255,10 +256,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
255 this.expected = expected; 256 this.expected = expected;
256 } 257 }
257 258
258 - int getQoS() {  
259 - return qoS;  
260 - }  
261 -  
262 @Override 259 @Override
263 public void connectionLost(Throwable throwable) { 260 public void connectionLost(Throwable throwable) {
264 } 261 }
@@ -268,7 +265,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -268,7 +265,9 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
268 log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); 265 log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
269 expected.add(new String(mqttMessage.getPayload())); 266 expected.add(new String(mqttMessage.getPayload()));
270 String responseTopic = requestTopic.replace("request", "response"); 267 String responseTopic = requestTopic.replace("request", "response");
271 - qoS = mqttMessage.getQos(); 268 + var qoS = mqttMessage.getQos();
  269 +
  270 + client.messageArrivedComplete(mqttMessage.getId(), qoS);
272 client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage)); 271 client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
273 latch.countDown(); 272 latch.countDown();
274 } 273 }
@@ -334,6 +334,7 @@ message ToDeviceRpcRequestMsg { @@ -334,6 +334,7 @@ message ToDeviceRpcRequestMsg {
334 int64 requestIdLSB = 6; 334 int64 requestIdLSB = 6;
335 bool oneway = 7; 335 bool oneway = 7;
336 bool persisted = 8; 336 bool persisted = 8;
  337 + int64 timeout = 9;
337 } 338 }
338 339
339 message ToDeviceRpcResponseMsg { 340 message ToDeviceRpcResponseMsg {
@@ -346,7 +347,7 @@ message UplinkNotificationMsg { @@ -346,7 +347,7 @@ message UplinkNotificationMsg {
346 int64 uplinkTs = 1; 347 int64 uplinkTs = 1;
347 } 348 }
348 349
349 -message ToDevicePersistedRpcResponseMsg { 350 +message ToDeviceRpcResponseStatusMsg {
350 int32 requestId = 1; 351 int32 requestId = 1;
351 int64 requestIdMSB = 2; 352 int64 requestIdMSB = 2;
352 int64 requestIdLSB = 3; 353 int64 requestIdLSB = 3;
@@ -456,7 +457,7 @@ message TransportToDeviceActorMsg { @@ -456,7 +457,7 @@ message TransportToDeviceActorMsg {
456 SubscriptionInfoProto subscriptionInfo = 7; 457 SubscriptionInfoProto subscriptionInfo = 7;
457 ClaimDeviceMsg claimDevice = 8; 458 ClaimDeviceMsg claimDevice = 8;
458 ProvisionDeviceRequestMsg provisionDevice = 9; 459 ProvisionDeviceRequestMsg provisionDevice = 9;
459 - ToDevicePersistedRpcResponseMsg persistedRpcResponseMsg = 10; 460 + ToDeviceRpcResponseStatusMsg rpcResponseStatusMsg = 10;
460 SendPendingRPCMsg sendPendingRPC = 11; 461 SendPendingRPCMsg sendPendingRPC = 11;
461 UplinkNotificationMsg uplinkNotificationMsg = 12; 462 UplinkNotificationMsg uplinkNotificationMsg = 12;
462 } 463 }
@@ -36,6 +36,8 @@ public class DataConstants { @@ -36,6 +36,8 @@ public class DataConstants {
36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats"; 36 public static final String ALARM_CONDITION_REPEATS = "alarmConditionRepeats";
37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration"; 37 public static final String ALARM_CONDITION_DURATION = "alarmConditionDuration";
38 public static final String PERSISTENT = "persistent"; 38 public static final String PERSISTENT = "persistent";
  39 + public static final String TIMEOUT = "timeout";
  40 + public static final String EXPIRATION_TIME = "expirationTime";
39 public static final String ADDITIONAL_INFO = "additionalInfo"; 41 public static final String ADDITIONAL_INFO = "additionalInfo";
40 public static final String COAP_TRANSPORT_NAME = "COAP"; 42 public static final String COAP_TRANSPORT_NAME = "COAP";
41 public static final String LWM2M_TRANSPORT_NAME = "LWM2M"; 43 public static final String LWM2M_TRANSPORT_NAME = "LWM2M";
@@ -34,6 +34,7 @@ public class ToDeviceRpcRequest implements Serializable { @@ -34,6 +34,7 @@ public class ToDeviceRpcRequest implements Serializable {
34 private final DeviceId deviceId; 34 private final DeviceId deviceId;
35 private final boolean oneway; 35 private final boolean oneway;
36 private final long expirationTime; 36 private final long expirationTime;
  37 + private final long timeout;
37 private final ToDeviceRpcRequestBody body; 38 private final ToDeviceRpcRequestBody body;
38 private final boolean persisted; 39 private final boolean persisted;
39 @JsonIgnore 40 @JsonIgnore
@@ -525,17 +525,26 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -525,17 +525,26 @@ public class DefaultCoapClientContext implements CoapClientContext {
525 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder()); 525 Response response = state.getAdaptor().convertToPublish(conRequest, msg, state.getConfiguration().getRpcRequestDynamicMessageBuilder());
526 int requestId = getNextMsgId(); 526 int requestId = getNextMsgId();
527 response.setMID(requestId); 527 response.setMID(requestId);
528 - if (msg.getPersisted() && conRequest) { 528 + if (conRequest) {
529 transportContext.getRpcAwaitingAck().put(requestId, msg); 529 transportContext.getRpcAwaitingAck().put(requestId, msg);
530 transportContext.getScheduler().schedule(() -> { 530 transportContext.getScheduler().schedule(() -> {
531 - transportContext.getRpcAwaitingAck().remove(requestId);  
532 - }, Math.max(0, msg.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); 531 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(requestId);
  532 + if (rpcRequestMsg != null) {
  533 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  534 + }
  535 + }, Math.max(0, Math.min(msg.getTimeout(), msg.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
  536 +
533 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> { 537 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
534 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id); 538 TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
535 if (rpcRequestMsg != null) { 539 if (rpcRequestMsg != null) {
536 transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); 540 transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
537 } 541 }
538 - }, null)); 542 + }, id -> {
  543 + TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
  544 + if (rpcRequestMsg != null) {
  545 + transportService.process(state.getSession(), msg, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  546 + }
  547 + }));
539 } 548 }
540 if (conRequest) { 549 if (conRequest) {
541 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state))); 550 response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> awake(state), id -> asleep(state)));
@@ -554,11 +563,11 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -554,11 +563,11 @@ public class DefaultCoapClientContext implements CoapClientContext {
554 transportService.process(state.getSession(), 563 transportService.process(state.getSession(),
555 TransportProtos.ToDeviceRpcResponseMsg.newBuilder() 564 TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
556 .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY); 565 .setRequestId(msg.getRequestId()).setError(error).build(), TransportServiceCallback.EMPTY);
557 - } else if (msg.getPersisted() && sent) {  
558 - if (conRequest) {  
559 - transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);  
560 - } else { 566 + } else if (sent) {
  567 + if (!conRequest) {
561 transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); 568 transportService.process(state.getSession(), msg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  569 + } else if (msg.getPersisted()) {
  570 + transportService.process(state.getSession(), msg, RpcStatus.SENT, TransportServiceCallback.EMPTY);
562 } 571 }
563 } 572 }
564 } 573 }
@@ -850,24 +850,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -850,24 +850,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
850 try { 850 try {
851 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> { 851 deviceSessionCtx.getPayloadAdaptor().convertToPublish(deviceSessionCtx, rpcRequest).ifPresent(payload -> {
852 int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); 852 int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
853 - if (rpcRequest.getPersisted() && isAckExpected(payload)) { 853 + if (isAckExpected(payload)) {
854 rpcAwaitingAck.put(msgId, rpcRequest); 854 rpcAwaitingAck.put(msgId, rpcRequest);
855 context.getScheduler().schedule(() -> { 855 context.getScheduler().schedule(() -> {
856 - rpcAwaitingAck.remove(msgId);  
857 - }, Math.max(0, rpcRequest.getExpirationTime() - System.currentTimeMillis()), TimeUnit.MILLISECONDS); 856 + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
  857 + if (msg != null) {
  858 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
  859 + }
  860 + }, Math.max(0, Math.min(rpcRequest.getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
858 } 861 }
859 var cf = publish(payload, deviceSessionCtx); 862 var cf = publish(payload, deviceSessionCtx);
860 - if (rpcRequest.getPersisted()) {  
861 - cf.addListener(result -> {  
862 - if (result.cause() == null) {  
863 - if (isAckExpected(payload)) {  
864 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);  
865 - } else {  
866 - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);  
867 - } 863 + cf.addListener(result -> {
  864 + if (result.cause() == null) {
  865 + if (!isAckExpected(payload)) {
  866 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
  867 + } else if (rpcRequest.getPersisted()) {
  868 + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
868 } 869 }
869 - });  
870 - } 870 + } else {
  871 + // TODO: send error
  872 + }
  873 + });
871 }); 874 });
872 } catch (Exception e) { 875 } catch (Exception e) {
873 transportService.process(deviceSessionCtx.getSessionInfo(), 876 transportService.process(deviceSessionCtx.getSessionInfo(),
@@ -581,19 +581,17 @@ public class DefaultTransportService implements TransportService { @@ -581,19 +581,17 @@ public class DefaultTransportService implements TransportService {
581 581
582 @Override 582 @Override
583 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) { 583 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
584 - if (msg.getPersisted()) {  
585 - TransportProtos.ToDevicePersistedRpcResponseMsg responseMsg = TransportProtos.ToDevicePersistedRpcResponseMsg.newBuilder()  
586 - .setRequestId(msg.getRequestId())  
587 - .setRequestIdLSB(msg.getRequestIdLSB())  
588 - .setRequestIdMSB(msg.getRequestIdMSB())  
589 - .setStatus(rpcStatus.name())  
590 - .build();  
591 -  
592 - if (checkLimits(sessionInfo, responseMsg, callback)) {  
593 - reportActivityInternal(sessionInfo);  
594 - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPersistedRpcResponseMsg(responseMsg).build(),  
595 - new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));  
596 - } 584 + TransportProtos.ToDeviceRpcResponseStatusMsg responseMsg = TransportProtos.ToDeviceRpcResponseStatusMsg.newBuilder()
  585 + .setRequestId(msg.getRequestId())
  586 + .setRequestIdLSB(msg.getRequestIdLSB())
  587 + .setRequestIdMSB(msg.getRequestIdMSB())
  588 + .setStatus(rpcStatus.name())
  589 + .build();
  590 +
  591 + if (checkLimits(sessionInfo, responseMsg, callback)) {
  592 + reportActivityInternal(sessionInfo);
  593 + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setRpcResponseStatusMsg(responseMsg).build(),
  594 + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
597 } 595 }
598 } 596 }
599 597
@@ -39,6 +39,7 @@ public final class RuleEngineDeviceRpcRequest { @@ -39,6 +39,7 @@ public final class RuleEngineDeviceRpcRequest {
39 private final String method; 39 private final String method;
40 private final String body; 40 private final String body;
41 private final long expirationTime; 41 private final long expirationTime;
  42 + private final long timeout;
42 private final boolean restApiCall; 43 private final boolean restApiCall;
43 private final String additionalInfo; 44 private final String additionalInfo;
44 45
@@ -89,9 +89,12 @@ public class TbSendRPCRequestNode implements TbNode { @@ -89,9 +89,12 @@ public class TbSendRPCRequestNode implements TbNode {
89 tmp = msg.getMetaData().getValue("originServiceId"); 89 tmp = msg.getMetaData().getValue("originServiceId");
90 String originServiceId = !StringUtils.isEmpty(tmp) ? tmp : null; 90 String originServiceId = !StringUtils.isEmpty(tmp) ? tmp : null;
91 91
92 - tmp = msg.getMetaData().getValue("expirationTime"); 92 + tmp = msg.getMetaData().getValue(DataConstants.EXPIRATION_TIME);
93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds())); 93 long expirationTime = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : (System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds()));
94 94
  95 + tmp = msg.getMetaData().getValue(DataConstants.TIMEOUT);
  96 + long timeout = !StringUtils.isEmpty(tmp) ? Long.parseLong(tmp) : TimeUnit.SECONDS.toMillis(config.getTimeoutInSeconds());
  97 +
95 String params; 98 String params;
96 JsonElement paramsEl = json.get("params"); 99 JsonElement paramsEl = json.get("params");
97 if (paramsEl.isJsonPrimitive()) { 100 if (paramsEl.isJsonPrimitive()) {
@@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode { @@ -112,6 +115,7 @@ public class TbSendRPCRequestNode implements TbNode {
112 .requestUUID(requestUUID) 115 .requestUUID(requestUUID)
113 .originServiceId(originServiceId) 116 .originServiceId(originServiceId)
114 .expirationTime(expirationTime) 117 .expirationTime(expirationTime)
  118 + .timeout(timeout)
115 .restApiCall(restApiCall) 119 .restApiCall(restApiCall)
116 .persisted(persisted) 120 .persisted(persisted)
117 .additionalInfo(additionalInfo) 121 .additionalInfo(additionalInfo)