Commit 9da51afecfef719104007c442dd158a4b56465ea

Authored by Andrii Shvaika
1 parent 9e848962

Improved WS API

... ... @@ -52,6 +52,8 @@ import org.thingsboard.server.queue.discovery.PartitionService;
52 52 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
53 53 import org.thingsboard.server.queue.util.TbCoreComponent;
54 54 import org.thingsboard.server.service.queue.TbClusterService;
  55 +import org.thingsboard.server.service.security.permission.Operation;
  56 +import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
55 57 import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
56 58 import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
57 59 import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
... ... @@ -211,21 +213,25 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
211 213 } else {
212 214 historyFuture = Futures.immediateFuture(ctx);
213 215 }
214   - if (cmd.getLatestCmd() != null) {
215   - Futures.addCallback(historyFuture, new FutureCallback<TbEntityDataSubCtx>() {
216   - @Override
217   - public void onSuccess(@Nullable TbEntityDataSubCtx theCtx) {
  216 + Futures.addCallback(historyFuture, new FutureCallback<TbEntityDataSubCtx>() {
  217 + @Override
  218 + public void onSuccess(@Nullable TbEntityDataSubCtx theCtx) {
  219 + if (cmd.getLatestCmd() != null) {
218 220 handleLatestCmd(theCtx, cmd.getLatestCmd());
  221 + } else if (cmd.getTsCmd() != null) {
  222 + handleTimeSeriesCmd(theCtx, cmd.getTsCmd());
  223 + } else if (!theCtx.isInitialDataSent()) {
  224 + EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null);
  225 + wsService.sendWsMsg(theCtx.getSessionId(), update);
  226 + theCtx.setInitialDataSent(true);
219 227 }
  228 + }
220 229
221   - @Override
222   - public void onFailure(Throwable t) {
223   - log.warn("[{}][{}] Failed to process command", session.getSessionId(), cmd.getCmdId());
224   - }
225   - }, wsCallBackExecutor);
226   - } else if (cmd.getTsCmd() != null) {
227   - handleTimeseriesCmd(ctx, cmd.getTsCmd());
228   - }
  230 + @Override
  231 + public void onFailure(Throwable t) {
  232 + log.warn("[{}][{}] Failed to process command", session.getSessionId(), cmd.getCmdId());
  233 + }
  234 + }, wsCallBackExecutor);
229 235 }
230 236
231 237 private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
... ... @@ -245,7 +251,47 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
245 251 }
246 252 }
247 253
248   - private void handleTimeseriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd tsCmd) {
  254 + private void handleTimeSeriesCmd(TbEntityDataSubCtx ctx, TimeSeriesCmd cmd) {
  255 + List<String> keys = cmd.getKeys();
  256 + log.debug("[{}][{}] Fetching time-series data for last {} ms for keys: ({})", ctx.getSessionId(), ctx.getCmdId(), cmd.getTimeWindow(), cmd.getKeys());
  257 + long startTs = cmd.getStartTs();
  258 + long endTs = cmd.getStartTs() + cmd.getTimeWindow();
  259 +
  260 + Map<EntityData, ListenableFuture<Map<String, List<TsValue>>>> tsFutures = new HashMap<>();
  261 + for (EntityData entityData : ctx.getData().getData()) {
  262 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),
  263 + getLimit(cmd.getLimit()), DefaultTelemetryWebSocketService.getAggregation(cmd.getAgg()))).collect(Collectors.toList());
  264 + ListenableFuture<List<TsKvEntry>> tsDataFutures = tsService.findAll(ctx.getTenantId(), entityData.getEntityId(), queries);
  265 + tsFutures.put(entityData, Futures.transform(tsDataFutures, this::toTsValues, MoreExecutors.directExecutor()));
  266 + }
  267 + Futures.addCallback(Futures.allAsList(tsFutures.values()), new FutureCallback<List<Map<String, List<TsValue>>>>() {
  268 + @Override
  269 + public void onSuccess(@Nullable List<Map<String, List<TsValue>>> result) {
  270 + tsFutures.forEach((key, value) -> {
  271 + try {
  272 + value.get().forEach((k, v) -> key.getTimeseries().put(k, v.toArray(new TsValue[v.size()])));
  273 + } catch (InterruptedException | ExecutionException e) {
  274 + log.warn("[{}][{}] Failed to lookup time-series data: {}:{}", ctx.getSessionId(), ctx.getCmdId(), key.getEntityId(), keys, e);
  275 + }
  276 + });
  277 + EntityDataUpdate update;
  278 + if (!ctx.isInitialDataSent()) {
  279 + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  280 + ctx.setInitialDataSent(true);
  281 + } else {
  282 + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
  283 + }
  284 + wsService.sendWsMsg(ctx.getSessionId(), update);
  285 + createSubscriptions(ctx, keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).collect(Collectors.toList()));
  286 + }
  287 +
  288 + @Override
  289 + public void onFailure(Throwable t) {
  290 + log.warn("[{}][{}] Failed to process websocket command: {}:{}", ctx.getSessionId(), ctx.getCmdId(), ctx.getQuery(), cmd, t);
  291 + wsService.sendWsMsg(ctx.getSessionId(),
  292 + new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to process websocket command!"));
  293 + }
  294 + }, wsCallBackExecutor);
249 295 }
250 296
251 297 private void handleLatestCmd(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
... ... @@ -257,7 +303,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
257 303 .filter(key -> key.getType().equals(EntityKeyType.TIME_SERIES))
258 304 .map(EntityKey::getKey).collect(Collectors.toList());
259 305
260   - Map<EntityData, ListenableFuture<Map<String, TsValue>>> missingTelemetryFurutes = new HashMap<>();
  306 + Map<EntityData, ListenableFuture<Map<String, TsValue>>> missingTelemetryFutures = new HashMap<>();
261 307 for (EntityData entityData : ctx.getData().getData()) {
262 308 Map<EntityKeyType, Map<String, TsValue>> latestEntityData = entityData.getLatest();
263 309 Map<String, TsValue> tsEntityData = latestEntityData.get(EntityKeyType.TIME_SERIES);
... ... @@ -270,12 +316,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
270 316 }
271 317
272 318 ListenableFuture<List<TsKvEntry>> missingTsData = tsService.findLatest(ctx.getTenantId(), entityData.getEntityId(), missingTsKeys);
273   - missingTelemetryFurutes.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor()));
  319 + missingTelemetryFutures.put(entityData, Futures.transform(missingTsData, this::toTsValue, MoreExecutors.directExecutor()));
274 320 }
275   - Futures.addCallback(Futures.allAsList(missingTelemetryFurutes.values()), new FutureCallback<List<Map<String, TsValue>>>() {
  321 + Futures.addCallback(Futures.allAsList(missingTelemetryFutures.values()), new FutureCallback<List<Map<String, TsValue>>>() {
276 322 @Override
277 323 public void onSuccess(@Nullable List<Map<String, TsValue>> result) {
278   - missingTelemetryFurutes.forEach((key, value) -> {
  324 + missingTelemetryFutures.forEach((key, value) -> {
279 325 try {
280 326 key.getLatest().get(EntityKeyType.TIME_SERIES).putAll(value.get());
281 327 } catch (InterruptedException | ExecutionException e) {
... ... @@ -285,11 +331,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
285 331 EntityDataUpdate update;
286 332 if (!ctx.isInitialDataSent()) {
287 333 update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
  334 + ctx.setInitialDataSent(true);
288 335 } else {
289 336 update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
290 337 }
291 338 wsService.sendWsMsg(ctx.getSessionId(), update);
292   - createLatestSubscriptions(ctx, latestCmd);
  339 + createSubscriptions(ctx, latestCmd.getKeys());
293 340 }
294 341
295 342 @Override
... ... @@ -303,14 +350,15 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
303 350 if (!ctx.isInitialDataSent()) {
304 351 EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
305 352 wsService.sendWsMsg(ctx.getSessionId(), update);
  353 + ctx.setInitialDataSent(true);
306 354 }
307   - createLatestSubscriptions(ctx, latestCmd);
  355 + createSubscriptions(ctx, latestCmd.getKeys());
308 356 }
309 357 }
310 358
311   - private void createLatestSubscriptions(TbEntityDataSubCtx ctx, LatestValueCmd latestCmd) {
  359 + private void createSubscriptions(TbEntityDataSubCtx ctx, List<EntityKey> keys) {
312 360 //TODO: create context for this (session, cmdId) that contains query, latestCmd and update. Subscribe + periodic updates.
313   - List<TbSubscription> tbSubs = ctx.createSubscriptions(latestCmd.getKeys());
  361 + List<TbSubscription> tbSubs = ctx.createSubscriptions(keys);
314 362 tbSubs.forEach(sub -> localSubscriptionService.addSubscription(sub));
315 363 }
316 364
... ... @@ -318,6 +366,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
318 366 return data.stream().collect(Collectors.toMap(TsKvEntry::getKey, value -> new TsValue(value.getTs(), value.getValueAsString())));
319 367 }
320 368
  369 + private Map<String, List<TsValue>> toTsValues(List<TsKvEntry> data) {
  370 + Map<String, List<TsValue>> results = new HashMap<>();
  371 + for (TsKvEntry tsKvEntry : data) {
  372 + results.computeIfAbsent(tsKvEntry.getKey(), k -> new ArrayList<>()).add(new TsValue(tsKvEntry.getTs(), tsKvEntry.getValueAsString()));
  373 + }
  374 + return results;
  375 + }
  376 +
321 377 private ListenableFuture<TbEntityDataSubCtx> handleHistoryCmd(TbEntityDataSubCtx ctx, EntityHistoryCmd historyCmd) {
322 378 List<ReadTsKvQuery> tsKvQueryList = historyCmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
323 379 key, historyCmd.getStartTs(), historyCmd.getEndTs(), historyCmd.getInterval(), getLimit(historyCmd.getLimit()), historyCmd.getAgg()
... ...
... ... @@ -657,11 +657,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
657 657 "Query is empty!");
658 658 sendWsMsg(sessionRef, update);
659 659 return false;
660   - } else if (cmd.getHistoryCmd() == null && cmd.getLatestCmd() == null && cmd.getTsCmd() == null) {
661   - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
662   - "No history, latest or timeseries command present!");
663   - sendWsMsg(sessionRef, update);
664   - return false;
665 660 }
666 661 return true;
667 662 }
... ... @@ -823,7 +818,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
823 818 }
824 819
825 820
826   - private static Aggregation getAggregation(String agg) {
  821 + public static Aggregation getAggregation(String agg) {
827 822 return StringUtils.isEmpty(agg) ? DEFAULT_AGGREGATION : Aggregation.valueOf(agg);
828 823 }
829 824
... ...
... ... @@ -21,7 +21,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
21 21 import org.junit.After;
22 22 import org.junit.Assert;
23 23 import org.junit.Before;
24   -import org.junit.Ignore;
25 24 import org.junit.Test;
26 25 import org.springframework.beans.factory.annotation.Autowired;
27 26 import org.thingsboard.server.common.data.Device;
... ... @@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.query.EntityKey;
42 41 import org.thingsboard.server.common.data.query.EntityKeyType;
43 42 import org.thingsboard.server.common.data.query.TsValue;
44 43 import org.thingsboard.server.common.data.security.Authority;
45   -import org.thingsboard.server.dao.timeseries.TimeseriesService;
46 44 import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
47 45 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
48 46 import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
... ... @@ -210,12 +208,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
210 208
211 209 Assert.assertEquals(1, update.getCmdId());
212 210
213   - pageData = update.getData();
214   - Assert.assertNotNull(pageData);
215   - Assert.assertEquals(1, pageData.getData().size());
216   - Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
217   - Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES));
218   - TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
  211 + List<EntityData> listData = update.getUpdate();
  212 + Assert.assertNotNull(listData);
  213 + Assert.assertEquals(1, listData.size());
  214 + Assert.assertEquals(device.getId(), listData.get(0).getEntityId());
  215 + Assert.assertNotNull(listData.get(0).getLatest().get(EntityKeyType.TIME_SERIES));
  216 + TsValue tsValue = listData.get(0).getLatest().get(EntityKeyType.TIME_SERIES).get("temperature");
219 217 Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsValue);
220 218
221 219 now = System.currentTimeMillis();
... ... @@ -299,12 +297,12 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
299 297
300 298 Assert.assertEquals(1, update.getCmdId());
301 299
302   - pageData = update.getData();
303   - Assert.assertNotNull(pageData);
304   - Assert.assertEquals(1, pageData.getData().size());
305   - Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId());
306   - Assert.assertNotNull(pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
307   - TsValue tsValue = pageData.getData().get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
  300 + List<EntityData> listData = update.getUpdate();
  301 + Assert.assertNotNull(listData);
  302 + Assert.assertEquals(1, listData.size());
  303 + Assert.assertEquals(device.getId(), listData.get(0).getEntityId());
  304 + Assert.assertNotNull(listData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE));
  305 + TsValue tsValue = listData.get(0).getLatest().get(EntityKeyType.SERVER_ATTRIBUTE).get("serverAttributeKey");
308 306 Assert.assertEquals(new TsValue(dataPoint1.getLastUpdateTs(), dataPoint1.getValueAsString()), tsValue);
309 307
310 308 now = System.currentTimeMillis();
... ... @@ -386,7 +384,6 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
386 384 Assert.assertEquals(0, pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getTs());
387 385 Assert.assertEquals("", pageData.getData().get(0).getLatest().get(EntityKeyType.ATTRIBUTE).get("anyAttributeKey").getValue());
388 386
389   -
390 387 wsClient.registerWaitForUpdate();
391 388 AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(now - TimeUnit.MINUTES.toMillis(1), new LongDataEntry("serverAttributeKey", 42L));
392 389 List<AttributeKvEntry> tsData = Arrays.asList(dataPoint1);
... ...
... ... @@ -26,9 +26,9 @@ import java.util.Arrays;
26 26
27 27 @RunWith(ClasspathSuite.class)
28 28 @ClasspathSuite.ClassnameFilters({
29   -// "org.thingsboard.server.controller.sql.WebsocketApiSqlTest",
  29 + "org.thingsboard.server.controller.sql.WebsocketApiSqlTest",
30 30 // "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest",
31   - "org.thingsboard.server.controller.sql.*Test",
  31 +// "org.thingsboard.server.controller.sql.*Test",
32 32 })
33 33 public class ControllerSqlTestSuite {
34 34
... ...