Commit 6436c8a26cc74545b10834717022db8d960d9a8e

Authored by YevhenBondarenko
1 parent 2ddd5caa

Implemented rpc sending sequence

@@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
48 import org.thingsboard.server.common.data.kv.KvEntry; 48 import org.thingsboard.server.common.data.kv.KvEntry;
49 import org.thingsboard.server.common.data.page.PageData; 49 import org.thingsboard.server.common.data.page.PageData;
50 import org.thingsboard.server.common.data.page.PageLink; 50 import org.thingsboard.server.common.data.page.PageLink;
  51 +import org.thingsboard.server.common.data.page.SortOrder;
51 import org.thingsboard.server.common.data.relation.EntityRelation; 52 import org.thingsboard.server.common.data.relation.EntityRelation;
52 import org.thingsboard.server.common.data.relation.RelationTypeGroup; 53 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
53 import org.thingsboard.server.common.data.rpc.Rpc; 54 import org.thingsboard.server.common.data.rpc.Rpc;
@@ -98,6 +99,7 @@ import java.util.Arrays; @@ -98,6 +99,7 @@ import java.util.Arrays;
98 import java.util.Collections; 99 import java.util.Collections;
99 import java.util.HashMap; 100 import java.util.HashMap;
100 import java.util.HashSet; 101 import java.util.HashSet;
  102 +import java.util.LinkedHashMap;
101 import java.util.List; 103 import java.util.List;
102 import java.util.Map; 104 import java.util.Map;
103 import java.util.Objects; 105 import java.util.Objects;
@@ -132,7 +134,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -132,7 +134,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
132 this.deviceId = deviceId; 134 this.deviceId = deviceId;
133 this.attributeSubscriptions = new HashMap<>(); 135 this.attributeSubscriptions = new HashMap<>();
134 this.rpcSubscriptions = new HashMap<>(); 136 this.rpcSubscriptions = new HashMap<>();
135 - this.toDeviceRpcPendingMap = new HashMap<>(); 137 + this.toDeviceRpcPendingMap = new LinkedHashMap<>();
136 this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit); 138 this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
137 if (initAttributes()) { 139 if (initAttributes()) {
138 restoreSessions(); 140 restoreSessions();
@@ -294,10 +296,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -294,10 +296,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
294 } 296 }
295 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), 297 systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
296 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); 298 null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
  299 + sendNextPendingRequest(context);
297 } 300 }
298 } 301 }
299 302
300 - private void sendPendingRequests(TbActorCtx context, UUID sessionId, SessionInfoProto sessionInfo) { 303 + private void sendPendingRequest(TbActorCtx context, UUID sessionId, String nodeId) {
301 SessionType sessionType = getSessionType(sessionId); 304 SessionType sessionType = getSessionType(sessionId);
302 if (!toDeviceRpcPendingMap.isEmpty()) { 305 if (!toDeviceRpcPendingMap.isEmpty()) {
303 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); 306 log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
@@ -309,13 +312,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -309,13 +312,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
309 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); 312 log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
310 } 313 }
311 Set<Integer> sentOneWayIds = new HashSet<>(); 314 Set<Integer> sentOneWayIds = new HashSet<>();
312 - if (sessionType == SessionType.ASYNC) {  
313 - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));  
314 - } else {  
315 - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, sessionInfo.getNodeId(), sentOneWayIds));  
316 - } 315 + toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  316 + }
317 317
318 - sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove); 318 + private void sendNextPendingRequest(TbActorCtx context) {
  319 + rpcSubscriptions.forEach((id, s) -> sendPendingRequest(context, id, s.getNodeId()));
319 } 320 }
320 321
321 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) { 322 private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
@@ -337,6 +338,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -337,6 +338,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
337 .setPersisted(request.isPersisted()) 338 .setPersisted(request.isPersisted())
338 .build(); 339 .build();
339 sendToTransport(rpcRequest, sessionId, nodeId); 340 sendToTransport(rpcRequest, sessionId, nodeId);
  341 +
  342 + if (SessionType.ASYNC.equals(getSessionType(sessionId)) && request.isOneway() && !request.isPersisted()) {
  343 + toDeviceRpcPendingMap.remove(entry.getKey());
  344 + sendPendingRequest(context, sessionId, nodeId);
  345 + }
340 }; 346 };
341 } 347 }
342 348
@@ -355,7 +361,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -355,7 +361,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
355 processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC()); 361 processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
356 } 362 }
357 if (msg.hasSendPendingRPC()) { 363 if (msg.hasSendPendingRPC()) {
358 - sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo); 364 + sendPendingRequest(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
359 } 365 }
360 if (msg.hasGetAttributes()) { 366 if (msg.hasGetAttributes()) {
361 handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes()); 367 handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
@@ -544,6 +550,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -544,6 +550,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
544 } 550 }
545 systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response); 551 systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
546 } 552 }
  553 + sendNextPendingRequest(context);
547 } else { 554 } else {
548 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); 555 log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
549 } 556 }
@@ -601,7 +608,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -601,7 +608,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
601 sessionMD.setSubscribedToRPC(true); 608 sessionMD.setSubscribedToRPC(true);
602 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); 609 log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
603 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); 610 rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
604 - sendPendingRequests(context, sessionId, sessionInfo); 611 + sendPendingRequest(context, sessionId, sessionInfo.getNodeId());
605 dumpSessions(); 612 dumpSessions();
606 } 613 }
607 } 614 }
@@ -869,7 +876,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -869,7 +876,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
869 876
870 void init(TbActorCtx ctx) { 877 void init(TbActorCtx ctx) {
871 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout()); 878 schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
872 - PageLink pageLink = new PageLink(1024); 879 + PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
873 PageData<Rpc> pageData; 880 PageData<Rpc> pageData;
874 do { 881 do {
875 pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink); 882 pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
@@ -90,6 +90,11 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab @@ -90,6 +90,11 @@ public abstract class AbstractMqttServerSideRpcDefaultIntegrationTest extends Ab
90 } 90 }
91 91
92 @Test 92 @Test
  93 + public void testSequenceServerMqttTwoWayRpc() throws Exception {
  94 + processSequenceTwoWayRpcTest();
  95 + }
  96 +
  97 + @Test
93 public void testGatewayServerMqttOneWayRpc() throws Exception { 98 public void testGatewayServerMqttOneWayRpc() throws Exception {
94 processOneWayRpcTestGateway("Gateway Device OneWay RPC"); 99 processOneWayRpcTestGateway("Gateway Device OneWay RPC");
95 } 100 }
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.transport.mqtt.rpc; 16 package org.thingsboard.server.transport.mqtt.rpc;
17 17
18 import com.fasterxml.jackson.databind.JsonNode; 18 import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
19 import com.google.protobuf.InvalidProtocolBufferException; 20 import com.google.protobuf.InvalidProtocolBufferException;
20 import com.nimbusds.jose.util.StandardCharset; 21 import com.nimbusds.jose.util.StandardCharset;
21 import io.netty.handler.codec.mqtt.MqttQoS; 22 import io.netty.handler.codec.mqtt.MqttQoS;
@@ -33,7 +34,9 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics; @@ -33,7 +34,9 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics;
33 import org.thingsboard.common.util.JacksonUtil; 34 import org.thingsboard.common.util.JacksonUtil;
34 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; 35 import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
35 36
  37 +import java.util.ArrayList;
36 import java.util.Arrays; 38 import java.util.Arrays;
  39 +import java.util.List;
37 import java.util.concurrent.CountDownLatch; 40 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeUnit;
39 42
@@ -101,6 +104,31 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -101,6 +104,31 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
101 Assert.assertEquals(expected, result); 104 Assert.assertEquals(expected, result);
102 } 105 }
103 106
  107 + protected void processSequenceTwoWayRpcTest() throws Exception {
  108 + List<String> expected = new ArrayList<>();
  109 + List<String> result = new ArrayList<>();
  110 +
  111 + String deviceId = savedDevice.getId().getId().toString();
  112 +
  113 + for (int i = 0; i < 10; i++) {
  114 + ObjectNode request = JacksonUtil.newObjectNode();
  115 + request.put("method", "test");
  116 + request.put("params", i);
  117 + expected.add(JacksonUtil.toString(request));
  118 + request.put("persistent", true);
  119 + doPostAsync("/api/rpc/twoway/" + deviceId, JacksonUtil.toString(request), String.class, status().isOk());
  120 + }
  121 +
  122 + MqttAsyncClient client = getMqttAsyncClient(accessToken);
  123 + CountDownLatch latch = new CountDownLatch(10);
  124 + TestSequenceMqttCallback callback = new TestSequenceMqttCallback(client, latch, result);
  125 + client.setCallback(callback);
  126 + client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, 1);
  127 +
  128 + latch.await(30, TimeUnit.SECONDS);
  129 + Assert.assertEquals(expected, result);
  130 + }
  131 +
104 protected void processTwoWayRpcTestGateway(String deviceName) throws Exception { 132 protected void processTwoWayRpcTestGateway(String deviceName) throws Exception {
105 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); 133 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
106 134
@@ -213,4 +241,41 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM @@ -213,4 +241,41 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
213 241
214 } 242 }
215 } 243 }
  244 +
  245 + protected class TestSequenceMqttCallback implements MqttCallback {
  246 +
  247 + private final MqttAsyncClient client;
  248 + private final CountDownLatch latch;
  249 + private final List<String> expected;
  250 + private Integer qoS;
  251 +
  252 + TestSequenceMqttCallback(MqttAsyncClient client, CountDownLatch latch, List<String> expected) {
  253 + this.client = client;
  254 + this.latch = latch;
  255 + this.expected = expected;
  256 + }
  257 +
  258 + int getQoS() {
  259 + return qoS;
  260 + }
  261 +
  262 + @Override
  263 + public void connectionLost(Throwable throwable) {
  264 + }
  265 +
  266 + @Override
  267 + public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception {
  268 + log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload()));
  269 + expected.add(new String(mqttMessage.getPayload()));
  270 + String responseTopic = requestTopic.replace("request", "response");
  271 + qoS = mqttMessage.getQos();
  272 + client.publish(responseTopic, processMessageArrived(requestTopic, mqttMessage));
  273 + latch.countDown();
  274 + }
  275 +
  276 + @Override
  277 + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  278 +
  279 + }
  280 + }
216 } 281 }
@@ -723,7 +723,7 @@ public class DefaultCoapClientContext implements CoapClientContext { @@ -723,7 +723,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
723 private void cancelRpcSubscription(TbCoapClientState state) { 723 private void cancelRpcSubscription(TbCoapClientState state) {
724 if (state.getRpc() != null) { 724 if (state.getRpc() != null) {
725 clientsByToken.remove(state.getRpc().getToken()); 725 clientsByToken.remove(state.getRpc().getToken());
726 - CoapExchange exchange = state.getAttrs().getExchange(); 726 + CoapExchange exchange = state.getRpc().getExchange();
727 state.setRpc(null); 727 state.setRpc(null);
728 transportService.process(state.getSession(), 728 transportService.process(state.getSession(),
729 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), 729 TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),