Commit f97d74d6be12bfbfafe1e6e48f26520c612fb745

Authored by Andrii Shvaika
1 parent 44f00eb0

Improved WS API

@@ -52,6 +52,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; @@ -52,6 +52,7 @@ import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
52 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd; 52 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
53 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; 53 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
54 import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; 54 import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
  55 +import org.thingsboard.server.service.telemetry.cmd.v2.GetTsCmd;
55 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd; 56 import org.thingsboard.server.service.telemetry.cmd.v2.LatestValueCmd;
56 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd; 57 import org.thingsboard.server.service.telemetry.cmd.v2.TimeSeriesCmd;
57 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; 58 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
@@ -272,50 +273,86 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -272,50 +273,86 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
272 } 273 }
273 } 274 }
274 275
275 - private void handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {  
276 - List<String> keys = cmd.getKeys(); 276 + private ListenableFuture<TbEntityDataSubCtx> handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {
277 log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getTimeWindow(), cmd.getKeys()); 277 log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getTimeWindow(), cmd.getKeys());
278 - long startTs = cmd.getStartTs();  
279 - long endTs = cmd.getStartTs() + cmd.getTimeWindow();  
280 -  
281 - Map<EntityData, ListenableFuture<Map<String, List<TsValue>>>> tsFutures = new HashMap<>();  
282 - for (EntityData entityData : ctx.getData().getData()) {  
283 - List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),  
284 - getLimit(cmd.getLimit()), DefaultTelemetryWebSocketService.getAggregation(cmd.getAgg()))).collect(Collectors.toList());  
285 - ListenableFuture<List<TsKvEntry>> tsDataFutures = tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), queries);  
286 - tsFutures.put(entityData, Futures.transform(tsDataFutures, this::toTsValues, MoreExecutors.directExecutor())); 278 + return handleGetTsCmd(ctx, cmd, true);
  279 + }
  280 +
  281 +
  282 + private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd cmd) {
  283 + log.debug("[{}][{}] Fetching history data for start {} and end {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getStartTs(), cmd.getEndTs(), cmd.getKeys());
  284 + return handleGetTsCmd(ctx, cmd, false);
  285 + }
  286 +
  287 + private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) {
  288 + List<String> keys = cmd.getKeys();
  289 + List<ReadTsKvQuery> finalTsKvQueryList;
  290 + List<ReadTsKvQuery> tsKvQueryList = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
  291 + key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
  292 + )).collect(Collectors.toList());
  293 + if (cmd.isFetchLatestPreviousPoint()) {
  294 + finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
  295 + tsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
  296 + key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
  297 + )).collect(Collectors.toList()));
  298 + } else {
  299 + finalTsKvQueryList = tsKvQueryList;
287 } 300 }
288 - Futures.addCallback(Futures.allAsList(tsFutures.values()), new FutureCallback<List<Map<String, List<TsValue>>>>() {  
289 - @Override  
290 - public void onSuccess(@Nullable List<Map<String, List<TsValue>>> result) {  
291 - tsFutures.forEach((key, value) -> {  
292 - try {  
293 - value.get().forEach((k, v) -> key.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));  
294 - } catch (InterruptedException | ExecutionException e) {  
295 - log.warn("[{}][{}] Failed to lookup time-series data: {}:{}", ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), keys, e); 301 + Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();
  302 + ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,
  303 + tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList)));
  304 + return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
  305 + fetchResultMap.forEach((entityData, future) -> {
  306 + Map<String, List<TsValue>> keyData = new LinkedHashMap<>();
  307 + cmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>()));
  308 + try {
  309 + List<TsKvEntry> entityTsData = future.get();
  310 + if (entityTsData != null) {
  311 + entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString())));
296 } 312 }
297 - });  
298 - EntityDataUpdate update;  
299 - if (!ctx.isInitialDataSent()) {  
300 - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);  
301 - ctx.setInitialDataSent(true);  
302 - } else {  
303 - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); 313 + keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
  314 + if (cmd.isFetchLatestPreviousPoint()) {
  315 + entityData.getTimeseries().values().forEach(dataArray -> {
  316 + Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs()));
  317 + });
  318 + }
  319 + } catch (InterruptedException | ExecutionException e) {
  320 + log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
  321 + wsService.sendWsMsg(ctx.getSessionId(),
  322 + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
304 } 323 }
305 - wsService.sendWsMsg(ctx.getSessionId(), update);  
306 - createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false);  
307 - ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear()); 324 + });
  325 + EntityDataUpdate update;
  326 + if (!ctx.isInitialDataSent()) {
  327 + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  328 + ctx.setInitialDataSent(true);
  329 + } else {
  330 + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
308 } 331 }
309 -  
310 - @Override  
311 - public void onFailure(Throwable t) {  
312 - log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), cmd, t);  
313 - wsService.sendWsMsg(ctx.getSessionId(),  
314 - new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!")); 332 + wsService.sendWsMsg(ctx.getSessionId(), update);
  333 + if (subscribe) {
  334 + createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()), false);
315 } 335 }
  336 + ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());
  337 + return ctx;
316 }, wsCallBackExecutor); 338 }, wsCallBackExecutor);
317 } 339 }
318 340
  341 + private List<ReadTsKvQuery> getReadTsKvQueries(GetTsCmd cmd) {
  342 + List<ReadTsKvQuery> finalTsKvQueryList;
  343 + List<ReadTsKvQuery> queries = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(),
  344 + getLimit(cmd.getLimit()), cmd.getAgg())).collect(Collectors.toList());
  345 + if (cmd.isFetchLatestPreviousPoint()) {
  346 + finalTsKvQueryList = new ArrayList<>(queries);
  347 + finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
  348 + key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
  349 + )).collect(Collectors.toList()));
  350 + } else {
  351 + finalTsKvQueryList = queries;
  352 + }
  353 + return finalTsKvQueryList;
  354 + }
  355 +
319 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) { 356 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
320 log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd); 357 log.trace("[{}][{}] Going to process latest command: {}", ctx.getSessionId(), ctx.getCmdId(), latestCmd);
321 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode. 358 //Fetch the latest values for telemetry keys (in case they are not copied from NoSQL to SQL DB in hybrid mode.
@@ -400,56 +437,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc @@ -400,56 +437,6 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
400 return results; 437 return results;
401 } 438 }
402 439
403 - private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) {  
404 - List<ReadTsKvQuery> finalTsKvQueryList;  
405 - List<ReadTsKvQuery> tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(  
406 - key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg()  
407 - )).collect(Collectors.toList());  
408 - if (historyCmd.isFetchLatestPreviousPoint()) {  
409 - finalTsKvQueryList = new ArrayList<>(tsKvQueryList);  
410 - tsKvQueryList.addAll(historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(  
411 - key, historyCmd.getStartTs() - TimeUnit.DAYS.toMillis(365), historyCmd.getStartTs(), historyCmd.getInterval(), 1, historyCmd.getAgg()  
412 - )).collect(Collectors.toList()));  
413 - } else {  
414 - finalTsKvQueryList = tsKvQueryList;  
415 - }  
416 - Map<EntityData, ListenableFuture<List<TsKvEntry>>> fetchResultMap = new HashMap<>();  
417 - ctx.getData().getData().forEach(entityData -> fetchResultMap.put(entityData,  
418 - tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), finalTsKvQueryList)));  
419 - return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {  
420 - fetchResultMap.forEach((entityData, future) -> {  
421 - Map<String, List<TsValue>> keyData = new LinkedHashMap<>();  
422 - historyCmd.getKeys().forEach(key -> keyData.put(key, new ArrayList<>()));  
423 - try {  
424 - List<TsKvEntry> entityTsData = future.get();  
425 - if (entityTsData != null) {  
426 - entityTsData.forEach(entry -> keyData.get(entry.getKey()).add(new TsValue(entry.getTs(), entry.getValueAsString())));  
427 - }  
428 - keyData.forEach((k, v) -> entityData.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));  
429 - if (historyCmd.isFetchLatestPreviousPoint()) {  
430 - entityData.getTimeseries().values().forEach(dataArray -> {  
431 - Arrays.sort(dataArray, (o1, o2) -> Long.compare(o2.getTs(), o1.getTs()));  
432 - });  
433 - }  
434 - } catch (InterruptedException | ExecutionException e) {  
435 - log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);  
436 - wsService.sendWsMsg(ctx.getSessionId(),  
437 - new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));  
438 - }  
439 - });  
440 - EntityDataUpdate update;  
441 - if (!ctx.isInitialDataSent()) {  
442 - update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);  
443 - ctx.setInitialDataSent(true);  
444 - } else {  
445 - update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());  
446 - }  
447 - wsService.sendWsMsg(ctx.getSessionId(), update);  
448 - ctx.getData().getData().forEach(ed -> ed.getTimeseries().clear());  
449 - return ctx;  
450 - }, wsCallBackExecutor);  
451 - }  
452 -  
453 @Override 440 @Override
454 public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) { 441 public void cancelSubscription(String sessionId, EntityDataUnsubscribeCmd cmd) {
455 cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId())); 442 cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId()));
@@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.kv.Aggregation; @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
21 import java.util.List; 21 import java.util.List;
22 22
23 @Data 23 @Data
24 -public class EntityHistoryCmd { 24 +public class EntityHistoryCmd implements GetTsCmd {
25 25
26 private List<String> keys; 26 private List<String> keys;
27 private long startTs; 27 private long startTs;
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.telemetry.cmd.v2;
  17 +
  18 +import org.thingsboard.server.common.data.kv.Aggregation;
  19 +
  20 +import java.util.List;
  21 +
  22 +public interface GetTsCmd {
  23 +
  24 + long getStartTs();
  25 +
  26 + long getEndTs();
  27 +
  28 + List<String> getKeys();
  29 +
  30 + long getInterval();
  31 +
  32 + int getLimit();
  33 +
  34 + Aggregation getAgg();
  35 +
  36 + boolean isFetchLatestPreviousPoint();
  37 +
  38 +}
@@ -15,18 +15,26 @@ @@ -15,18 +15,26 @@
15 */ 15 */
16 package org.thingsboard.server.service.telemetry.cmd.v2; 16 package org.thingsboard.server.service.telemetry.cmd.v2;
17 17
  18 +import com.fasterxml.jackson.annotation.JsonIgnore;
18 import lombok.Data; 19 import lombok.Data;
  20 +import org.thingsboard.server.common.data.kv.Aggregation;
19 21
20 import java.util.List; 22 import java.util.List;
21 23
22 @Data 24 @Data
23 -public class TimeSeriesCmd { 25 +public class TimeSeriesCmd implements GetTsCmd {
24 26
25 private List<String> keys; 27 private List<String> keys;
26 private long startTs; 28 private long startTs;
27 private long timeWindow; 29 private long timeWindow;
28 private long interval; 30 private long interval;
29 private int limit; 31 private int limit;
30 - private String agg; 32 + private Aggregation agg;
  33 + private boolean fetchLatestPreviousPoint;
31 34
  35 + @JsonIgnore
  36 + @Override
  37 + public long getEndTs() {
  38 + return startTs + timeWindow;
  39 + }
32 } 40 }
@@ -194,7 +194,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -194,7 +194,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
194 194
195 TimeSeriesCmd tsCmd = new TimeSeriesCmd(); 195 TimeSeriesCmd tsCmd = new TimeSeriesCmd();
196 tsCmd.setKeys(Arrays.asList("temperature")); 196 tsCmd.setKeys(Arrays.asList("temperature"));
197 - tsCmd.setAgg(Aggregation.NONE.name()); 197 + tsCmd.setAgg(Aggregation.NONE);
198 tsCmd.setLimit(1000); 198 tsCmd.setLimit(1000);
199 tsCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1)); 199 tsCmd.setStartTs(now - TimeUnit.HOURS.toMillis(1));
200 tsCmd.setTimeWindow(TimeUnit.HOURS.toMillis(1)); 200 tsCmd.setTimeWindow(TimeUnit.HOURS.toMillis(1));
@@ -561,16 +561,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { @@ -561,16 +561,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
561 wsClient.registerWaitForUpdate(); 561 wsClient.registerWaitForUpdate();
562 AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L)); 562 AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
563 List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1); 563 List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
564 - sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);  
565 -  
566 Thread.sleep(100); 564 Thread.sleep(100);
567 565
568 - cmd = new EntityDataCmd(1, edq, null, latestCmd, null);  
569 - wrapper = new TelemetryPluginCmdsWrapper();  
570 - wrapper.setEntityDataCmds(Collections.singletonList(cmd)); 566 + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, tsData);
571 567
572 msg = wsClient.waitForUpdate(); 568 msg = wsClient.waitForUpdate();
573 - 569 + Assert.assertNotNull(msg);
574 update = mapper.readValue(msg, EntityDataUpdate.class); 570 update = mapper.readValue(msg, EntityDataUpdate.class);
575 Assert.assertEquals(1, update.getCmdId()); 571 Assert.assertEquals(1, update.getCmdId());
576 List<EntityData> eData = update.getUpdate(); 572 List<EntityData> eData = update.getUpdate();