Commit be8460ad2b5464355a66f255bf868b6656e0d74d

Authored by Andrii Shvaika
1 parent 05174bb8

WS API improvement

@@ -212,8 +212,12 @@ public class TbEntityDataSubCtx { @@ -212,8 +212,12 @@ public class TbEntityDataSubCtx {
212 } 212 }
213 213
214 public Collection<Integer> clearSubscriptions() { 214 public Collection<Integer> clearSubscriptions() {
215 - List<Integer> oldSubIds = new ArrayList<>(subToEntityIdMap.keySet());  
216 - subToEntityIdMap.clear();  
217 - return oldSubIds; 215 + if (subToEntityIdMap != null) {
  216 + List<Integer> oldSubIds = new ArrayList<>(subToEntityIdMap.keySet());
  217 + subToEntityIdMap.clear();
  218 + return oldSubIds;
  219 + } else {
  220 + return Collections.emptyList();
  221 + }
218 } 222 }
219 } 223 }
@@ -159,6 +159,95 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -159,6 +159,95 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
159 } 159 }
160 160
161 @Test 161 @Test
  162 + public void testEntityDataLatestWidgetFlow() throws Exception {
  163 + Device device = new Device();
  164 + device.setName("Device");
  165 + device.setType("default");
  166 + device.setLabel("testLabel" + (int) (Math.random() * 1000));
  167 + device = doPost("/api/device", device, Device.class);
  168 +
  169 + long now = System.currentTimeMillis();
  170 +
  171 + DeviceTypeFilter dtf = new DeviceTypeFilter();
  172 + dtf.setDeviceNameFilter("D");
  173 + dtf.setDeviceType("default");
  174 + EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(),
  175 + Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")), Collections.emptyList());
  176 +
  177 + EntityDataCmd cmd = new EntityDataCmd(1, edq, null, null, null);
  178 +
  179 + TelemetryPluginCmdsWrapper wrapper = new TelemetryPluginCmdsWrapper();
  180 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  181 +
  182 + wsClient.send(mapper.writeValueAsString(wrapper));
  183 + String msg = wsClient.waitForReply();
  184 + EntityDataUpdate update = mapper.readValue(msg, EntityDataUpdate.class);
  185 + Assert.assertEquals(1, update.getCmdId());
  186 + PageData<EntityData> pageData = update.getData();
  187 + Assert.assertNotNull(pageData);
  188 + Assert.assertEquals(1, pageData.getData().size());
  189 + Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
  190 + Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature"));
  191 + Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getTs());
  192 + Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature").getValue());
  193 +
  194 + TsKvEntry dataPoint1 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("temperature", 42L));
  195 + List<TsKvEntry> tsData = Arrays.asList(dataPoint1);
  196 + sendTelemetry(device, tsData);
  197 +
  198 + Thread.sleep(100);
  199 +
  200 + LatestValueCmd latestCmd = new LatestValueCmd();
  201 + latestCmd.setKeys(Collections.singletonList(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")));
  202 + cmd = new EntityDataCmd(1, null, null, latestCmd, null);
  203 + wrapper = new TelemetryPluginCmdsWrapper();
  204 + wrapper.setEntityDataCmds(Collections.singletonList(cmd));
  205 +
  206 + wsClient.send(mapper.writeValueAsString(wrapper));
  207 + msg = wsClient.waitForReply();
  208 + update = mapper.readValue(msg, EntityDataUpdate.class);
  209 +
  210 + Assert.assertEquals(1, update.getCmdId());
  211 +
  212 + List<EntityData> listData = update.getUpdate();
  213 + Assert.assertNotNull(listData);
  214 + Assert.assertEquals(1, listData.size());
  215 + Assert.assertEquals(device.getId(), listData.get(0).getEntityId());
  216 + Assert.assertNotNull(listData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
  217 + TsValue tsValue = listData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
  218 + Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue);
  219 +
  220 + now = System.currentTimeMillis();
  221 + TsKvEntry dataPoint2 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 52L));
  222 +
  223 + wsClient.registerWaitForUpdate();
  224 + sendTelemetry(device, Arrays.asList(dataPoint2));
  225 + msg = wsClient.waitForUpdate();
  226 +
  227 + update = mapper.readValue(msg, EntityDataUpdate.class);
  228 + Assert.assertEquals(1, update.getCmdId());
  229 + List<EntityData> eData = update.getUpdate();
  230 + Assert.assertNotNull(eData);
  231 + Assert.assertEquals(1, eData.size());
  232 + Assert.assertEquals(device.getId(), eData.get(0).getEntityId());
  233 + Assert.assertNotNull(eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
  234 + tsValue = eData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
  235 + Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsValue);
  236 +
  237 + //Sending update from the past, while latest value has new timestamp;
  238 + wsClient.registerWaitForUpdate();
  239 + sendTelemetry(device, Arrays.asList(dataPoint1));
  240 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  241 + Assert.assertNull(msg);
  242 +
  243 + //Sending duplicate update again
  244 + wsClient.registerWaitForUpdate();
  245 + sendTelemetry(device, Arrays.asList(dataPoint2));
  246 + msg = wsClient.waitForUpdate(TimeUnit.SECONDS.toMillis(1));
  247 + Assert.assertNull(msg);
  248 + }
  249 +
  250 + @Test
162 public void testEntityDataLatestTsWsCmd() throws Exception { 251 public void testEntityDataLatestTsWsCmd() throws Exception {
163 Device device = new Device(); 252 Device device = new Device();
164 device.setName("Device"); 253 device.setName("Device");
@@ -54,12 +54,12 @@ public class TbTestWebSocketClient extends WebSocketClient { @@ -54,12 +54,12 @@ public class TbTestWebSocketClient extends WebSocketClient {
54 54
55 @Override 55 @Override
56 public void onClose(int i, String s, boolean b) { 56 public void onClose(int i, String s, boolean b) {
57 - 57 + log.error("CLOSED:");
58 } 58 }
59 59
60 @Override 60 @Override
61 public void onError(Exception e) { 61 public void onError(Exception e) {
62 - 62 + log.error("ERROR:", e);
63 } 63 }
64 64
65 public void registerWaitForUpdate() { 65 public void registerWaitForUpdate() {