Commit 2cccbc7dca182c0456f50e634b61cef1ec93f9ff

Authored by Igor Kulikov
1 parent aa58338c

MQTT tests: fix CountDown latch

@@ -103,10 +103,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -103,10 +103,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
103 options.setUserName(accessToken); 103 options.setUserName(accessToken);
104 client.connect(options).waitForCompletion(); 104 client.connect(options).waitForCompletion();
105 105
106 - TestMqttCallback callback = new TestMqttCallback(client);  
107 - client.setCallback(callback);  
108 CountDownLatch latch = new CountDownLatch(1); 106 CountDownLatch latch = new CountDownLatch(1);
109 - latch.countDown(); 107 + TestMqttCallback callback = new TestMqttCallback(client, latch);
  108 + client.setCallback(callback);
  109 +
110 client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value()); 110 client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value());
111 111
112 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 112 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
@@ -163,7 +163,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -163,7 +163,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
163 options.setUserName(accessToken); 163 options.setUserName(accessToken);
164 client.connect(options).waitForCompletion(); 164 client.connect(options).waitForCompletion();
165 client.subscribe("v1/devices/me/rpc/request/+", 1); 165 client.subscribe("v1/devices/me/rpc/request/+", 1);
166 - client.setCallback(new TestMqttCallback(client)); 166 + client.setCallback(new TestMqttCallback(client, new CountDownLatch(1)));
167 167
168 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; 168 String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
169 String deviceId = savedDevice.getId().getId().toString(); 169 String deviceId = savedDevice.getId().getId().toString();
@@ -211,10 +211,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -211,10 +211,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
211 private static class TestMqttCallback implements MqttCallback { 211 private static class TestMqttCallback implements MqttCallback {
212 212
213 private final MqttAsyncClient client; 213 private final MqttAsyncClient client;
  214 + private final CountDownLatch latch;
214 private Integer qoS; 215 private Integer qoS;
215 216
216 - TestMqttCallback(MqttAsyncClient client) { 217 + TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
217 this.client = client; 218 this.client = client;
  219 + this.latch = latch;
218 } 220 }
219 221
220 int getQoS() { 222 int getQoS() {
@@ -233,6 +235,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @@ -233,6 +235,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC
233 message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); 235 message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8"));
234 qoS = mqttMessage.getQos(); 236 qoS = mqttMessage.getQos();
235 client.publish(responseTopic, message); 237 client.publish(responseTopic, message);
  238 + latch.countDown();
236 } 239 }
237 240
238 @Override 241 @Override
@@ -105,10 +105,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -105,10 +105,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
105 MqttConnectOptions options = new MqttConnectOptions(); 105 MqttConnectOptions options = new MqttConnectOptions();
106 options.setUserName(accessToken); 106 options.setUserName(accessToken);
107 client.connect(options).waitForCompletion(3000); 107 client.connect(options).waitForCompletion(3000);
108 - TestMqttCallback callback = new TestMqttCallback(client);  
109 - client.setCallback(callback);  
110 CountDownLatch latch = new CountDownLatch(1); 108 CountDownLatch latch = new CountDownLatch(1);
111 - latch.countDown(); 109 + TestMqttCallback callback = new TestMqttCallback(client, latch);
  110 + client.setCallback(callback);
112 client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); 111 client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
113 String payload = "{\"key\":\"value\"}"; 112 String payload = "{\"key\":\"value\"}";
114 String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); 113 String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
@@ -120,6 +119,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -120,6 +119,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
120 private static class TestMqttCallback implements MqttCallback { 119 private static class TestMqttCallback implements MqttCallback {
121 120
122 private final MqttAsyncClient client; 121 private final MqttAsyncClient client;
  122 + private final CountDownLatch latch;
123 private Integer qoS; 123 private Integer qoS;
124 private String payload; 124 private String payload;
125 125
@@ -127,8 +127,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -127,8 +127,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
127 return payload; 127 return payload;
128 } 128 }
129 129
130 - TestMqttCallback(MqttAsyncClient client) { 130 + TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
131 this.client = client; 131 this.client = client;
  132 + this.latch = latch;
132 } 133 }
133 134
134 int getQoS() { 135 int getQoS() {
@@ -143,6 +144,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr @@ -143,6 +144,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr
143 public void messageArrived(String requestTopic, MqttMessage mqttMessage) { 144 public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
144 payload = new String(mqttMessage.getPayload()); 145 payload = new String(mqttMessage.getPayload());
145 qoS = mqttMessage.getQos(); 146 qoS = mqttMessage.getQos();
  147 + latch.countDown();
146 } 148 }
147 149
148 @Override 150 @Override