Showing
2 changed files
with
421 additions
and
0 deletions
@@ -15,11 +15,15 @@ | @@ -15,11 +15,15 @@ | ||
15 | */ | 15 | */ |
16 | package org.thingsboard.server.msa; | 16 | package org.thingsboard.server.msa; |
17 | 17 | ||
18 | +import com.fasterxml.jackson.core.JsonProcessingException; | ||
19 | +import com.fasterxml.jackson.databind.JsonMappingException; | ||
20 | +import com.fasterxml.jackson.databind.JsonNode; | ||
18 | import com.fasterxml.jackson.databind.ObjectMapper; | 21 | import com.fasterxml.jackson.databind.ObjectMapper; |
19 | import com.google.common.collect.ImmutableMap; | 22 | import com.google.common.collect.ImmutableMap; |
20 | import com.google.gson.JsonArray; | 23 | import com.google.gson.JsonArray; |
21 | import com.google.gson.JsonObject; | 24 | import com.google.gson.JsonObject; |
22 | import lombok.extern.slf4j.Slf4j; | 25 | import lombok.extern.slf4j.Slf4j; |
26 | +import org.apache.cassandra.cql3.Json; | ||
23 | import org.apache.commons.lang3.RandomStringUtils; | 27 | import org.apache.commons.lang3.RandomStringUtils; |
24 | import org.apache.http.config.Registry; | 28 | import org.apache.http.config.Registry; |
25 | import org.apache.http.config.RegistryBuilder; | 29 | import org.apache.http.config.RegistryBuilder; |
@@ -32,6 +36,7 @@ import org.apache.http.impl.client.HttpClients; | @@ -32,6 +36,7 @@ import org.apache.http.impl.client.HttpClients; | ||
32 | import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; | 36 | import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; |
33 | import org.apache.http.ssl.SSLContextBuilder; | 37 | import org.apache.http.ssl.SSLContextBuilder; |
34 | import org.apache.http.ssl.SSLContexts; | 38 | import org.apache.http.ssl.SSLContexts; |
39 | +import org.json.simple.JSONObject; | ||
35 | import org.junit.BeforeClass; | 40 | import org.junit.BeforeClass; |
36 | import org.junit.Rule; | 41 | import org.junit.Rule; |
37 | import org.junit.rules.TestRule; | 42 | import org.junit.rules.TestRule; |
@@ -42,6 +47,7 @@ import org.thingsboard.rest.client.RestClient; | @@ -42,6 +47,7 @@ import org.thingsboard.rest.client.RestClient; | ||
42 | import org.thingsboard.server.common.data.Device; | 47 | import org.thingsboard.server.common.data.Device; |
43 | import org.thingsboard.server.common.data.EntityType; | 48 | import org.thingsboard.server.common.data.EntityType; |
44 | import org.thingsboard.server.common.data.id.DeviceId; | 49 | import org.thingsboard.server.common.data.id.DeviceId; |
50 | +import org.thingsboard.server.common.data.security.DeviceCredentials; | ||
45 | import org.thingsboard.server.msa.mapper.WsTelemetryResponse; | 51 | import org.thingsboard.server.msa.mapper.WsTelemetryResponse; |
46 | 52 | ||
47 | 53 | ||
@@ -52,6 +58,7 @@ import java.net.URI; | @@ -52,6 +58,7 @@ import java.net.URI; | ||
52 | import java.security.cert.X509Certificate; | 58 | import java.security.cert.X509Certificate; |
53 | import java.util.List; | 59 | import java.util.List; |
54 | import java.util.Map; | 60 | import java.util.Map; |
61 | +import java.util.Optional; | ||
55 | import java.util.Random; | 62 | import java.util.Random; |
56 | 63 | ||
57 | @Slf4j | 64 | @Slf4j |
@@ -95,6 +102,17 @@ public abstract class AbstractContainerTest { | @@ -95,6 +102,17 @@ public abstract class AbstractContainerTest { | ||
95 | } | 102 | } |
96 | }; | 103 | }; |
97 | 104 | ||
105 | + protected Device createGatewayDevice() throws JsonProcessingException { | ||
106 | + String isGateway = "{\"gateway\":true}"; | ||
107 | + ObjectMapper objectMapper = new ObjectMapper(); | ||
108 | + JsonNode additionalInfo = objectMapper.readTree(isGateway); | ||
109 | + Device gatewayDeviceTemplate = new Device(); | ||
110 | + gatewayDeviceTemplate.setName("mqtt_gateway"); | ||
111 | + gatewayDeviceTemplate.setType("gateway"); | ||
112 | + gatewayDeviceTemplate.setAdditionalInfo(additionalInfo); | ||
113 | + return restClient.saveDevice(gatewayDeviceTemplate); | ||
114 | + } | ||
115 | + | ||
98 | protected Device createDevice(String name) { | 116 | protected Device createDevice(String name) { |
99 | return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT"); | 117 | return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT"); |
100 | } | 118 | } |
@@ -140,6 +158,27 @@ public abstract class AbstractContainerTest { | @@ -140,6 +158,27 @@ public abstract class AbstractContainerTest { | ||
140 | return expectedValue.equals(list.get(1)); | 158 | return expectedValue.equals(list.get(1)); |
141 | } | 159 | } |
142 | 160 | ||
161 | + protected JsonObject createGatewayConnectPayload(String deviceName){ | ||
162 | + JsonObject payload = new JsonObject(); | ||
163 | + payload.addProperty("device", deviceName); | ||
164 | + return payload; | ||
165 | + } | ||
166 | + | ||
167 | + protected JsonObject createGatewayPayload(String deviceName, long ts){ | ||
168 | + JsonObject payload = new JsonObject(); | ||
169 | + payload.add(deviceName, createGatewayTelemetryArray(ts)); | ||
170 | + return payload; | ||
171 | + } | ||
172 | + | ||
173 | + protected JsonArray createGatewayTelemetryArray(long ts){ | ||
174 | + JsonArray telemetryArray = new JsonArray(); | ||
175 | + if (ts > 0) | ||
176 | + telemetryArray.add(createPayload(ts)); | ||
177 | + else | ||
178 | + telemetryArray.add(createPayload()); | ||
179 | + return telemetryArray; | ||
180 | + } | ||
181 | + | ||
143 | protected JsonObject createPayload(long ts) { | 182 | protected JsonObject createPayload(long ts) { |
144 | JsonObject values = createPayload(); | 183 | JsonObject values = createPayload(); |
145 | JsonObject payload = new JsonObject(); | 184 | JsonObject payload = new JsonObject(); |
msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.msa.connectivity; | ||
17 | + | ||
18 | +import com.fasterxml.jackson.databind.JsonNode; | ||
19 | +import com.fasterxml.jackson.databind.ObjectMapper; | ||
20 | +import com.google.common.collect.Sets; | ||
21 | +import com.google.common.util.concurrent.ListenableFuture; | ||
22 | +import com.google.common.util.concurrent.ListeningExecutorService; | ||
23 | +import com.google.common.util.concurrent.MoreExecutors; | ||
24 | +import com.google.gson.JsonElement; | ||
25 | +import com.google.gson.JsonObject; | ||
26 | +import com.google.gson.JsonParser; | ||
27 | +import io.netty.buffer.ByteBuf; | ||
28 | +import io.netty.buffer.Unpooled; | ||
29 | +import io.netty.handler.codec.mqtt.MqttQoS; | ||
30 | +import lombok.Data; | ||
31 | +import lombok.extern.slf4j.Slf4j; | ||
32 | +import org.apache.commons.lang3.RandomStringUtils; | ||
33 | +import org.checkerframework.checker.units.qual.A; | ||
34 | +import org.junit.After; | ||
35 | +import org.junit.Assert; | ||
36 | +import org.junit.Before; | ||
37 | +import org.junit.Test; | ||
38 | +import org.springframework.core.ParameterizedTypeReference; | ||
39 | +import org.springframework.http.HttpMethod; | ||
40 | +import org.springframework.http.ResponseEntity; | ||
41 | +import org.thingsboard.mqtt.MqttClient; | ||
42 | +import org.thingsboard.mqtt.MqttClientConfig; | ||
43 | +import org.thingsboard.mqtt.MqttHandler; | ||
44 | +import org.thingsboard.server.common.data.Device; | ||
45 | +import org.thingsboard.server.common.data.id.DeviceId; | ||
46 | +import org.thingsboard.server.common.data.id.EntityId; | ||
47 | +import org.thingsboard.server.common.data.id.RuleChainId; | ||
48 | +import org.thingsboard.server.common.data.page.TextPageData; | ||
49 | +import org.thingsboard.server.common.data.relation.EntityRelation; | ||
50 | +import org.thingsboard.server.common.data.relation.EntityRelationInfo; | ||
51 | +import org.thingsboard.server.common.data.relation.RelationTypeGroup; | ||
52 | +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; | ||
53 | +import org.thingsboard.server.common.data.rule.RuleChain; | ||
54 | +import org.thingsboard.server.common.data.rule.RuleChainMetaData; | ||
55 | +import org.thingsboard.server.common.data.rule.RuleNode; | ||
56 | +import org.thingsboard.server.common.data.security.DeviceCredentials; | ||
57 | +import org.thingsboard.server.msa.AbstractContainerTest; | ||
58 | +import org.thingsboard.server.msa.WsClient; | ||
59 | +import org.thingsboard.server.msa.mapper.AttributesResponse; | ||
60 | +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; | ||
61 | + | ||
62 | +import java.io.IOException; | ||
63 | +import java.nio.charset.StandardCharsets; | ||
64 | +import java.util.*; | ||
65 | +import java.util.concurrent.*; | ||
66 | + | ||
67 | +@Slf4j | ||
68 | +public class MqttGatewayClientTest extends AbstractContainerTest { | ||
69 | + Device gatewayDevice; | ||
70 | + MqttClient mqttClient; | ||
71 | + Device createdDevice; | ||
72 | + MqttMessageListener listener; | ||
73 | + | ||
74 | + @Before | ||
75 | + public void createGateway() throws Exception { | ||
76 | + restClient.login("tenant@thingsboard.org", "tenant"); | ||
77 | + this.gatewayDevice = createGatewayDevice(); | ||
78 | + Optional<DeviceCredentials> gatewayDeviceCredentials = restClient.getDeviceCredentialsByDeviceId(gatewayDevice.getId()); | ||
79 | + Assert.assertTrue(gatewayDeviceCredentials.isPresent()); | ||
80 | + this.listener = new MqttMessageListener(); | ||
81 | + this.mqttClient = getMqttClient(gatewayDeviceCredentials.get(), listener); | ||
82 | + this.createdDevice = createDeviceThroughGateway(mqttClient, gatewayDevice); | ||
83 | + } | ||
84 | + | ||
85 | + @After | ||
86 | + public void removeGateway() throws Exception { | ||
87 | + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + this.gatewayDevice.getId()); | ||
88 | + restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + this.createdDevice.getId()); | ||
89 | + this.listener = null; | ||
90 | + this.mqttClient = null; | ||
91 | + this.createdDevice = null; | ||
92 | + } | ||
93 | + | ||
94 | + @Test | ||
95 | + public void telemetryUpload() throws Exception { | ||
96 | + WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); | ||
97 | + mqttClient.publish("v1/gateway/telemetry", Unpooled.wrappedBuffer(createGatewayPayload(createdDevice.getName(), -1).toString().getBytes())).get(); | ||
98 | + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); | ||
99 | + log.info("Received telemetry: {}", actualLatestTelemetry); | ||
100 | + wsClient.closeBlocking(); | ||
101 | + | ||
102 | + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); | ||
103 | + Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"), | ||
104 | + actualLatestTelemetry.getLatestValues().keySet()); | ||
105 | + | ||
106 | + Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString())); | ||
107 | + Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1")); | ||
108 | + Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0))); | ||
109 | + Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73))); | ||
110 | + } | ||
111 | + | ||
112 | + @Test | ||
113 | + public void telemetryUploadWithTs() throws Exception { | ||
114 | + long ts = 1451649600512L; | ||
115 | + | ||
116 | + restClient.login("tenant@thingsboard.org", "tenant"); | ||
117 | + WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); | ||
118 | + mqttClient.publish("v1/gateway/telemetry", Unpooled.wrappedBuffer(createGatewayPayload(createdDevice.getName(), ts).toString().getBytes())).get(); | ||
119 | + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); | ||
120 | + log.info("Received telemetry: {}", actualLatestTelemetry); | ||
121 | + wsClient.closeBlocking(); | ||
122 | + | ||
123 | + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); | ||
124 | + Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues()); | ||
125 | + | ||
126 | + Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString())); | ||
127 | + Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", ts, "value1")); | ||
128 | + Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0))); | ||
129 | + Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73))); | ||
130 | + } | ||
131 | + | ||
132 | + @Test | ||
133 | + public void publishAttributeUpdateToServer() throws Exception { | ||
134 | + Optional<DeviceCredentials> createdDeviceCredentials = restClient.getDeviceCredentialsByDeviceId(createdDevice.getId()); | ||
135 | + Assert.assertTrue(createdDeviceCredentials.isPresent()); | ||
136 | + WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS); | ||
137 | + JsonObject clientAttributes = new JsonObject(); | ||
138 | + clientAttributes.addProperty("attr1", "value1"); | ||
139 | + clientAttributes.addProperty("attr2", true); | ||
140 | + clientAttributes.addProperty("attr3", 42.0); | ||
141 | + clientAttributes.addProperty("attr4", 73); | ||
142 | + JsonObject gatewayClientAttributes = new JsonObject(); | ||
143 | + gatewayClientAttributes.add(createdDevice.getName(), clientAttributes); | ||
144 | + mqttClient.publish("v1/gateway/attributes", Unpooled.wrappedBuffer(gatewayClientAttributes.toString().getBytes())).get(); | ||
145 | + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); | ||
146 | + log.info("Received attributes: {}", actualLatestTelemetry); | ||
147 | + wsClient.closeBlocking(); | ||
148 | + | ||
149 | + Assert.assertEquals(4, actualLatestTelemetry.getData().size()); | ||
150 | + Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"), | ||
151 | + actualLatestTelemetry.getLatestValues().keySet()); | ||
152 | + | ||
153 | + Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1")); | ||
154 | + Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString())); | ||
155 | + Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0))); | ||
156 | + Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73))); | ||
157 | + } | ||
158 | + | ||
159 | + @Test | ||
160 | + public void requestAttributeValuesFromServer() throws Exception { | ||
161 | + WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS); | ||
162 | + // Add a new client attribute | ||
163 | + JsonObject clientAttributes = new JsonObject(); | ||
164 | + String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8); | ||
165 | + clientAttributes.addProperty("clientAttr", clientAttributeValue); | ||
166 | + | ||
167 | + JsonObject gatewayClientAttributes = new JsonObject(); | ||
168 | + gatewayClientAttributes.add(createdDevice.getName(), clientAttributes); | ||
169 | + mqttClient.publish("v1/gateway/attributes", Unpooled.wrappedBuffer(gatewayClientAttributes.toString().getBytes())).get(); | ||
170 | + | ||
171 | + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); | ||
172 | + log.info("Received ws telemetry: {}", actualLatestTelemetry); | ||
173 | + wsClient.closeBlocking(); | ||
174 | + | ||
175 | + Assert.assertEquals(1, actualLatestTelemetry.getData().size()); | ||
176 | + Assert.assertEquals(Sets.newHashSet("clientAttr"), | ||
177 | + actualLatestTelemetry.getLatestValues().keySet()); | ||
178 | + | ||
179 | + Assert.assertTrue(verify(actualLatestTelemetry, "clientAttr", clientAttributeValue)); | ||
180 | + | ||
181 | + // Add a new shared attribute | ||
182 | + JsonObject sharedAttributes = new JsonObject(); | ||
183 | + String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); | ||
184 | + sharedAttributes.addProperty("sharedAttr", sharedAttributeValue); | ||
185 | + | ||
186 | + ResponseEntity sharedAttributesResponse = restClient.getRestTemplate() | ||
187 | + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", | ||
188 | + mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, | ||
189 | + createdDevice.getId()); | ||
190 | + Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); | ||
191 | + MqttEvent sharedAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); | ||
192 | + | ||
193 | + // Catch attribute update event | ||
194 | + Assert.assertNotNull(sharedAttributeEvent); | ||
195 | + Assert.assertEquals("v1/gateway/attributes", sharedAttributeEvent.getTopic()); | ||
196 | + | ||
197 | + // Subscribe to attributes response | ||
198 | + mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); | ||
199 | + | ||
200 | + // Wait until subscription is processed | ||
201 | + TimeUnit.SECONDS.sleep(3); | ||
202 | + | ||
203 | + checkAttribute(true, clientAttributeValue); | ||
204 | + checkAttribute(false, sharedAttributeValue); | ||
205 | + } | ||
206 | + | ||
207 | + @Test | ||
208 | + public void subscribeToAttributeUpdatesFromServer() throws Exception { | ||
209 | + mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); | ||
210 | + // Wait until subscription is processed | ||
211 | + TimeUnit.SECONDS.sleep(3); | ||
212 | + String sharedAttributeName = "sharedAttr"; | ||
213 | + // Add a new shared attribute | ||
214 | + | ||
215 | + JsonObject sharedAttributes = new JsonObject(); | ||
216 | + String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); | ||
217 | + sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue); | ||
218 | + | ||
219 | + JsonObject gatewaySharedAttributeValue = new JsonObject(); | ||
220 | + gatewaySharedAttributeValue.addProperty("device", createdDevice.getName()); | ||
221 | + gatewaySharedAttributeValue.add("data", sharedAttributes); | ||
222 | + | ||
223 | + ResponseEntity sharedAttributesResponse = restClient.getRestTemplate() | ||
224 | + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", | ||
225 | + mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, | ||
226 | + createdDevice.getId()); | ||
227 | + Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); | ||
228 | + | ||
229 | + MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); | ||
230 | + Assert.assertEquals(sharedAttributeValue, | ||
231 | + mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); | ||
232 | + | ||
233 | + // Update the shared attribute value | ||
234 | + JsonObject updatedSharedAttributes = new JsonObject(); | ||
235 | + String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8); | ||
236 | + updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue); | ||
237 | + | ||
238 | + JsonObject gatewayUpdatedSharedAttributeValue = new JsonObject(); | ||
239 | + gatewayUpdatedSharedAttributeValue.addProperty("device", createdDevice.getName()); | ||
240 | + gatewayUpdatedSharedAttributeValue.add("data", updatedSharedAttributes); | ||
241 | + | ||
242 | + ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate() | ||
243 | + .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE", | ||
244 | + mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class, | ||
245 | + createdDevice.getId()); | ||
246 | + Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); | ||
247 | + | ||
248 | + event = listener.getEvents().poll(10, TimeUnit.SECONDS); | ||
249 | + Assert.assertEquals(updatedSharedAttributeValue, | ||
250 | + mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); | ||
251 | + } | ||
252 | + | ||
253 | + @Test | ||
254 | + public void serverSideRpc() throws Exception { | ||
255 | + String gatewayRpcTopic = "v1/gateway/rpc"; | ||
256 | + mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); | ||
257 | + | ||
258 | + // Wait until subscription is processed | ||
259 | + TimeUnit.SECONDS.sleep(3); | ||
260 | + | ||
261 | + // Send an RPC from the server | ||
262 | + JsonObject serverRpcPayload = new JsonObject(); | ||
263 | + serverRpcPayload.addProperty("method", "getValue"); | ||
264 | + serverRpcPayload.addProperty("params", true); | ||
265 | + ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); | ||
266 | + ListenableFuture<ResponseEntity> future = service.submit(() -> { | ||
267 | + try { | ||
268 | + return restClient.getRestTemplate() | ||
269 | + .postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}", | ||
270 | + mapper.readTree(serverRpcPayload.toString()), String.class, | ||
271 | + createdDevice.getId()); | ||
272 | + } catch (IOException e) { | ||
273 | + return ResponseEntity.badRequest().build(); | ||
274 | + } | ||
275 | + }); | ||
276 | + | ||
277 | + // Wait for RPC call from the server and send the response | ||
278 | + MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); | ||
279 | + | ||
280 | + Assert.assertNotNull(requestFromServer); | ||
281 | + Assert.assertNotNull(requestFromServer.getMessage()); | ||
282 | + | ||
283 | + JsonObject requestFromServerJson = new JsonParser().parse(requestFromServer.getMessage()).getAsJsonObject(); | ||
284 | + | ||
285 | + Assert.assertEquals(createdDevice.getName(), requestFromServerJson.get("device").getAsString()); | ||
286 | + | ||
287 | + JsonObject requestFromServerData = requestFromServerJson.get("data").getAsJsonObject(); | ||
288 | + | ||
289 | + Assert.assertEquals("getValue", requestFromServerData.get("method").getAsString()); | ||
290 | + Assert.assertTrue(requestFromServerData.get("params").getAsBoolean()); | ||
291 | + | ||
292 | + int requestId = requestFromServerData.get("id").getAsInt(); | ||
293 | + | ||
294 | + JsonObject clientResponse = new JsonObject(); | ||
295 | + clientResponse.addProperty("response", "someResponse"); | ||
296 | + JsonObject gatewayResponse = new JsonObject(); | ||
297 | + gatewayResponse.addProperty("device", createdDevice.getName()); | ||
298 | + gatewayResponse.addProperty("id", requestId); | ||
299 | + gatewayResponse.add("data", clientResponse); | ||
300 | + // Send a response to the server's RPC request | ||
301 | + | ||
302 | + mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); | ||
303 | + ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); | ||
304 | + Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); | ||
305 | + Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); | ||
306 | + } | ||
307 | + | ||
308 | + private void checkAttribute(boolean client, String expectedValue) throws Exception{ | ||
309 | + JsonObject gatewayAttributesRequest = new JsonObject(); | ||
310 | + int messageId = new Random().nextInt(100); | ||
311 | + gatewayAttributesRequest.addProperty("id", messageId); | ||
312 | + gatewayAttributesRequest.addProperty("device", createdDevice.getName()); | ||
313 | + gatewayAttributesRequest.addProperty("client", client); | ||
314 | + String attributeName; | ||
315 | + if (client) | ||
316 | + attributeName = "clientAttr"; | ||
317 | + else | ||
318 | + attributeName = "sharedAttr"; | ||
319 | + gatewayAttributesRequest.addProperty("key", attributeName); | ||
320 | + log.info(gatewayAttributesRequest.toString()); | ||
321 | + mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); | ||
322 | + MqttEvent clientAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); | ||
323 | + Assert.assertNotNull(clientAttributeEvent); | ||
324 | + JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); | ||
325 | + | ||
326 | + Assert.assertEquals(messageId, responseMessage.get("id").getAsInt()); | ||
327 | + Assert.assertEquals(createdDevice.getName(), responseMessage.get("device").getAsString()); | ||
328 | + Assert.assertEquals(3, responseMessage.entrySet().size()); | ||
329 | + Assert.assertEquals(expectedValue, responseMessage.get("value").getAsString()); | ||
330 | + } | ||
331 | + | ||
332 | + private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { | ||
333 | + String deviceName = "mqtt_device"; | ||
334 | + mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes())).get(); | ||
335 | + | ||
336 | + TimeUnit.SECONDS.sleep(3); | ||
337 | + List<EntityRelation> relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); | ||
338 | + | ||
339 | + Assert.assertEquals(1, relations.size()); | ||
340 | + | ||
341 | + EntityId createdEntityId = relations.get(0).getTo(); | ||
342 | + DeviceId createdDeviceId = new DeviceId(createdEntityId.getId()); | ||
343 | + Optional<Device> createdDevice = restClient.getDeviceById(createdDeviceId); | ||
344 | + | ||
345 | + Assert.assertTrue(createdDevice.isPresent()); | ||
346 | + | ||
347 | + return createdDevice.get(); | ||
348 | + } | ||
349 | + | ||
350 | + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { | ||
351 | + MqttClientConfig clientConfig = new MqttClientConfig(); | ||
352 | + clientConfig.setClientId("MQTT client from test"); | ||
353 | + clientConfig.setUsername(deviceCredentials.getCredentialsId()); | ||
354 | + MqttClient mqttClient = MqttClient.create(clientConfig, listener); | ||
355 | + mqttClient.connect("localhost", 1883).get(); | ||
356 | + return mqttClient; | ||
357 | + } | ||
358 | + | ||
359 | + @Data | ||
360 | + private class MqttMessageListener implements MqttHandler { | ||
361 | + private final BlockingQueue<MqttEvent> events; | ||
362 | + | ||
363 | + private MqttMessageListener() { | ||
364 | + events = new ArrayBlockingQueue<>(100); | ||
365 | + } | ||
366 | + | ||
367 | + @Override | ||
368 | + public void onMessage(String topic, ByteBuf message) { | ||
369 | + log.info("MQTT message [{}], topic [{}]", message.toString(StandardCharsets.UTF_8), topic); | ||
370 | + events.add(new MqttEvent(topic, message.toString(StandardCharsets.UTF_8))); | ||
371 | + } | ||
372 | + | ||
373 | + } | ||
374 | + | ||
375 | + @Data | ||
376 | + private class MqttEvent { | ||
377 | + private final String topic; | ||
378 | + private final String message; | ||
379 | + } | ||
380 | + | ||
381 | + | ||
382 | +} |