Commit 54b609e8bd2ee157e217c1fc1ccc363bf7bd6352

Authored by Igor Kulikov
1 parent bc50ef88

Fix Tenant Actor: Ack transport to device message when no tenant exists. Improve MQTT tests.

... ... @@ -47,6 +47,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
47 47 import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
48 48 import org.thingsboard.server.common.msg.queue.RuleEngineException;
49 49 import org.thingsboard.server.common.msg.queue.ServiceType;
  50 +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
50 51
51 52 import java.util.List;
52 53 import java.util.Optional;
... ... @@ -116,6 +117,9 @@ public class TenantActor extends RuleChainManagerActor {
116 117 if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
117 118 QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
118 119 queueMsg.getTbMsg().getCallback().onSuccess();
  120 + } else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)){
  121 + TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
  122 + transportMsg.getCallback().onSuccess();
119 123 }
120 124 return true;
121 125 }
... ...
... ... @@ -42,6 +42,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
42 42 import java.util.ArrayList;
43 43 import java.util.List;
44 44 import java.util.concurrent.atomic.AtomicInteger;
  45 +import java.util.function.Supplier;
45 46
46 47 import static org.junit.Assert.assertEquals;
47 48 import static org.junit.Assert.assertNotNull;
... ... @@ -213,4 +214,33 @@ public abstract class AbstractMqttIntegrationTest extends AbstractControllerTest
213 214 return builder.build();
214 215 }
215 216
  217 + protected <T> T doExecuteWithRetriesAndInterval(SupplierWithThrowable<T> supplier, int retries, int intervalMs) throws Exception {
  218 + int count = 0;
  219 + T result = null;
  220 + Throwable lastException = null;
  221 + while (count < retries) {
  222 + try {
  223 + result = supplier.get();
  224 + if (result != null) {
  225 + return result;
  226 + }
  227 + } catch (Throwable e) {
  228 + lastException = e;
  229 + }
  230 + count++;
  231 + if (count < retries) {
  232 + Thread.sleep(intervalMs);
  233 + }
  234 + }
  235 + if (lastException != null) {
  236 + throw new RuntimeException(lastException);
  237 + } else {
  238 + return result;
  239 + }
  240 + }
  241 +
  242 + @FunctionalInterface
  243 + public interface SupplierWithThrowable<T> {
  244 + T get() throws Throwable;
  245 + }
216 246 }
... ...
... ... @@ -84,10 +84,12 @@ public abstract class AbstractMqttAttributesRequestIntegrationTest extends Abstr
84 84
85 85 postGatewayDeviceClientAttributes(client);
86 86
87   - Thread.sleep(1000);
  87 + Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class),
  88 + 20,
  89 + 100);
88 90
89   - Device savedDevice = doGet("/api/tenant/devices?deviceName=" + "Gateway Device Request Attributes", Device.class);
90 91 assertNotNull(savedDevice);
  92 +
91 93 doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
92 94
93 95 Thread.sleep(1000);
... ...
... ... @@ -84,7 +84,7 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
84 84
85 85 client.subscribe(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
86 86
87   - Thread.sleep(2000);
  87 + Thread.sleep(1000);
88 88
89 89 doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
90 90 onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
... ... @@ -127,14 +127,15 @@ public abstract class AbstractMqttAttributesUpdatesIntegrationTest extends Abstr
127 127
128 128 publishMqttMsg(client, connectPayloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
129 129
130   - Thread.sleep(1000);
  130 + Device savedDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class),
  131 + 20,
  132 + 100);
131 133
132   - Device savedDevice = doGet("/api/tenant/devices?deviceName=" + "Gateway Device Subscribe to attribute updates", Device.class);
133 134 assertNotNull(savedDevice);
134 135
135 136 client.subscribe(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE.value());
136 137
137   - Thread.sleep(2000);
  138 + Thread.sleep(1000);
138 139
139 140 doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", POST_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
140 141 onUpdateCallback.getLatch().await(3, TimeUnit.SECONDS);
... ...
... ... @@ -75,25 +75,21 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
75 75 }
76 76
77 77 @Test
78   - @Ignore
79 78 public void testClaimingDevice() throws Exception {
80 79 processTestClaimingDevice(false);
81 80 }
82 81
83 82 @Test
84   - @Ignore
85 83 public void testClaimingDeviceWithoutSecretAndDuration() throws Exception {
86 84 processTestClaimingDevice(true);
87 85 }
88 86
89 87 @Test
90   - @Ignore
91 88 public void testGatewayClaimingDevice() throws Exception {
92 89 processTestGatewayClaimingDevice("Test claiming gateway device", false);
93 90 }
94 91
95 92 @Test
96   - @Ignore
97 93 public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
98 94 processTestGatewayClaimingDevice("Test claiming gateway device empty payload", true);
99 95 }
... ... @@ -116,8 +112,6 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
116 112 protected void validateClaimResponse(boolean emptyPayload, MqttAsyncClient client, byte[] payloadBytes, byte[] failurePayloadBytes) throws Exception {
117 113 client.publish(MqttTopics.DEVICE_CLAIM_TOPIC, new MqttMessage(failurePayloadBytes));
118 114
119   - Thread.sleep(2000);
120   -
121 115 loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
122 116 ClaimRequest claimRequest;
123 117 if (!emptyPayload) {
... ... @@ -126,14 +120,21 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
126 120 claimRequest = new ClaimRequest(null);
127 121 }
128 122
129   - ClaimResponse claimResponse = doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest());
  123 + ClaimResponse claimResponse = doExecuteWithRetriesAndInterval(
  124 + () -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest()),
  125 + 20,
  126 + 100
  127 + );
  128 +
130 129 assertEquals(claimResponse, ClaimResponse.FAILURE);
131 130
132 131 client.publish(MqttTopics.DEVICE_CLAIM_TOPIC, new MqttMessage(payloadBytes));
133 132
134   - Thread.sleep(2000);
135   -
136   - ClaimResult claimResult = doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk());
  133 + ClaimResult claimResult = doExecuteWithRetriesAndInterval(
  134 + () -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk()),
  135 + 20,
  136 + 100
  137 + );
137 138 assertEquals(claimResult.getResponse(), ClaimResponse.SUCCESS);
138 139 Device claimedDevice = claimResult.getDevice();
139 140 assertNotNull(claimedDevice);
... ... @@ -147,9 +148,12 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
147 148 protected void validateGatewayClaimResponse(String deviceName, boolean emptyPayload, MqttAsyncClient client, byte[] failurePayloadBytes, byte[] payloadBytes) throws Exception {
148 149 client.publish(MqttTopics.GATEWAY_CLAIM_TOPIC, new MqttMessage(failurePayloadBytes));
149 150
150   - Thread.sleep(2000);
  151 + Device savedDevice = doExecuteWithRetriesAndInterval(
  152 + () -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
  153 + 20,
  154 + 100
  155 + );
151 156
152   - Device savedDevice = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
153 157 assertNotNull(savedDevice);
154 158
155 159 loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
... ... @@ -165,9 +169,12 @@ public abstract class AbstractMqttClaimDeviceTest extends AbstractMqttIntegratio
165 169
166 170 client.publish(MqttTopics.GATEWAY_CLAIM_TOPIC, new MqttMessage(payloadBytes));
167 171
168   - Thread.sleep(2000);
  172 + ClaimResult claimResult = doExecuteWithRetriesAndInterval(
  173 + () -> doPostClaimAsync("/api/customer/device/" + deviceName + "/claim", claimRequest, ClaimResult.class, status().isOk()),
  174 + 20,
  175 + 100
  176 + );
169 177
170   - ClaimResult claimResult = doPostClaimAsync("/api/customer/device/" + deviceName + "/claim", claimRequest, ClaimResult.class, status().isOk());
171 178 assertEquals(claimResult.getResponse(), ClaimResponse.SUCCESS);
172 179 Device claimedDevice = claimResult.getDevice();
173 180 assertNotNull(claimedDevice);
... ...
... ... @@ -52,7 +52,6 @@ public abstract class AbstractMqttClaimJsonDeviceTest extends AbstractMqttClaimD
52 52 }
53 53
54 54 @Test
55   - @Ignore
56 55 public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
57 56 processTestGatewayClaimingDevice("Test claiming gateway device empty payload Json", true);
58 57 }
... ...
... ... @@ -37,25 +37,21 @@ public abstract class AbstractMqttClaimProtoDeviceTest extends AbstractMqttClaim
37 37 public void afterTest() throws Exception { super.afterTest(); }
38 38
39 39 @Test
40   - @Ignore
41 40 public void testClaimingDevice() throws Exception {
42 41 processTestClaimingDevice(false);
43 42 }
44 43
45 44 @Test
46   - @Ignore
47 45 public void testClaimingDeviceWithoutSecretAndDuration() throws Exception {
48 46 processTestClaimingDevice(true);
49 47 }
50 48
51 49 @Test
52   - @Ignore
53 50 public void testGatewayClaimingDevice() throws Exception {
54 51 processTestGatewayClaimingDevice("Test claiming gateway device Proto", false);
55 52 }
56 53
57 54 @Test
58   - @Ignore
59 55 public void testGatewayClaimingDeviceWithoutSecretAndDuration() throws Exception {
60 56 processTestGatewayClaimingDevice("Test claiming gateway device empty payload Proto", true);
61 57 }
... ...
... ... @@ -84,7 +84,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
84 84
85 85 client.subscribe(MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE.value());
86 86
87   - Thread.sleep(2000);
  87 + Thread.sleep(1000);
88 88
89 89 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
90 90 String deviceId = savedDevice.getId().getId().toString();
... ... @@ -109,7 +109,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
109 109 TestMqttCallback callback = new TestMqttCallback(client, latch);
110 110 client.setCallback(callback);
111 111
112   - Thread.sleep(2000);
  112 + Thread.sleep(1000);
113 113
114 114 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
115 115 String deviceId = savedDevice.getId().getId().toString();
... ... @@ -132,9 +132,11 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
132 132 protected void validateOneWayRpcGatewayResponse(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception {
133 133 publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
134 134
135   - Thread.sleep(2000);
136   -
137   - Device savedDevice = getDeviceByName(deviceName);
  135 + Device savedDevice = doExecuteWithRetriesAndInterval(
  136 + () -> getDeviceByName(deviceName),
  137 + 20,
  138 + 100
  139 + );
138 140 assertNotNull(savedDevice);
139 141
140 142 CountDownLatch latch = new CountDownLatch(1);
... ... @@ -143,7 +145,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
143 145
144 146 client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value());
145 147
146   - Thread.sleep(2000);
  148 + Thread.sleep(1000);
147 149
148 150 String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}";
149 151 String deviceId = savedDevice.getId().getId().toString();
... ... @@ -156,9 +158,11 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
156 158 protected void validateTwoWayRpcGateway(String deviceName, MqttAsyncClient client, byte[] payloadBytes) throws Exception {
157 159 publishMqttMsg(client, payloadBytes, MqttTopics.GATEWAY_CONNECT_TOPIC);
158 160
159   - Thread.sleep(2000);
160   -
161   - Device savedDevice = getDeviceByName(deviceName);
  161 + Device savedDevice = doExecuteWithRetriesAndInterval(
  162 + () -> getDeviceByName(deviceName),
  163 + 20,
  164 + 100
  165 + );
162 166 assertNotNull(savedDevice);
163 167
164 168 CountDownLatch latch = new CountDownLatch(1);
... ... @@ -167,7 +171,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
167 171
168 172 client.subscribe(MqttTopics.GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE.value());
169 173
170   - Thread.sleep(2000);
  174 + Thread.sleep(1000);
171 175
172 176 String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}";
173 177 String deviceId = savedDevice.getId().getId().toString();
... ...
... ... @@ -106,13 +106,20 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
106 106
107 107 publishMqttMsg(client, payload, MqttTopics.GATEWAY_ATTRIBUTES_TOPIC);
108 108
109   - Thread.sleep(2000);
  109 + Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
  110 + 20,
  111 + 100);
110 112
111   - Device firstDevice = doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class);
112 113 assertNotNull(firstDevice);
113   - Device secondDevice = doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class);
  114 +
  115 + Device secondDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class),
  116 + 20,
  117 + 100);
  118 +
114 119 assertNotNull(secondDevice);
115 120
  121 + Thread.sleep(2000);
  122 +
116 123 List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/attributes/CLIENT_SCOPE", List.class);
117 124 Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys);
118 125
... ...
... ... @@ -88,10 +88,12 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
88 88 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
89 89 publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
90 90
91   - Thread.sleep(2000);
92   -
93 91 String deviceName = "Device A";
94   - Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
  92 +
  93 + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
  94 + 20,
  95 + 100);
  96 +
95 97 assertNotNull(device);
96 98 }
97 99
... ... @@ -139,13 +141,20 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
139 141
140 142 publishMqttMsg(client, payload, topic);
141 143
142   - Thread.sleep(2000);
  144 + Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
  145 + 20,
  146 + 100);
143 147
144   - Device firstDevice = doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class);
145 148 assertNotNull(firstDevice);
146   - Device secondDevice = doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class);
  149 +
  150 + Device secondDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + secondDeviceName, Device.class),
  151 + 20,
  152 + 100);
  153 +
147 154 assertNotNull(secondDevice);
148 155
  156 + Thread.sleep(2000);
  157 +
149 158 List<String> firstDeviceActualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/timeseries", List.class);
150 159 Set<String> firstDeviceActualKeySet = new HashSet<>(firstDeviceActualKeys);
151 160
... ...
... ... @@ -73,10 +73,10 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
73 73 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
74 74 publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
75 75
76   - Thread.sleep(2000);
77   -
78 76 String deviceName = "Device A";
79   - Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
  77 + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
  78 + 20,
  79 + 100);
80 80 assertNotNull(device);
81 81 }
82 82 }
... ...
... ... @@ -81,9 +81,10 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
81 81 MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
82 82 publishMqttMsg(client, connectMsgProto.toByteArray(), MqttTopics.GATEWAY_CONNECT_TOPIC);
83 83
84   - Thread.sleep(2000);
  84 + Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
  85 + 20,
  86 + 100);
85 87
86   - Device device = doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class);
87 88 assertNotNull(device);
88 89 }
89 90
... ...