Commit 29fc2386764a2971b12b7ef7affd4113700d3fe6

Authored by Dima Landiak
1 parent c1f3abf9

refactoring and changing class names

Showing 17 changed files with 174 additions and 92 deletions
... ... @@ -28,7 +28,13 @@ import org.springframework.http.HttpStatus;
28 28 import org.springframework.http.ResponseEntity;
29 29 import org.springframework.security.access.prepost.PreAuthorize;
30 30 import org.springframework.util.StringUtils;
31   -import org.springframework.web.bind.annotation.*;
  31 +import org.springframework.web.bind.annotation.PathVariable;
  32 +import org.springframework.web.bind.annotation.RequestBody;
  33 +import org.springframework.web.bind.annotation.RequestMapping;
  34 +import org.springframework.web.bind.annotation.RequestMethod;
  35 +import org.springframework.web.bind.annotation.RequestParam;
  36 +import org.springframework.web.bind.annotation.ResponseBody;
  37 +import org.springframework.web.bind.annotation.RestController;
32 38 import org.springframework.web.context.request.async.DeferredResult;
33 39 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
34 40 import org.thingsboard.server.common.data.DataConstants;
... ... @@ -39,7 +45,19 @@ import org.thingsboard.server.common.data.id.DeviceId;
39 45 import org.thingsboard.server.common.data.id.EntityId;
40 46 import org.thingsboard.server.common.data.id.EntityIdFactory;
41 47 import org.thingsboard.server.common.data.id.UUIDBased;
42   -import org.thingsboard.server.common.data.kv.*;
  48 +import org.thingsboard.server.common.data.kv.Aggregation;
  49 +import org.thingsboard.server.common.data.kv.AttributeKey;
  50 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  51 +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
  52 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  53 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  54 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  55 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  56 +import org.thingsboard.server.common.data.kv.KvEntry;
  57 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  58 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  59 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  60 +import org.thingsboard.server.common.data.kv.TsKvEntry;
43 61 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
44 62 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
45 63 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
... ... @@ -56,7 +74,13 @@ import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
56 74 import javax.annotation.Nullable;
57 75 import javax.annotation.PostConstruct;
58 76 import javax.annotation.PreDestroy;
59   -import java.util.*;
  77 +import java.util.ArrayList;
  78 +import java.util.Arrays;
  79 +import java.util.HashSet;
  80 +import java.util.LinkedHashMap;
  81 +import java.util.List;
  82 +import java.util.Map;
  83 +import java.util.Set;
60 84 import java.util.concurrent.ExecutorService;
61 85 import java.util.concurrent.Executors;
62 86 import java.util.stream.Collectors;
... ... @@ -176,7 +200,7 @@ public class TelemetryController extends BaseController {
176 200 (result, entityId) -> {
177 201 // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
178 202 Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
179   - List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC"))
  203 + List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg))
180 204 .collect(Collectors.toList());
181 205
182 206 Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
... ...
... ... @@ -35,16 +35,16 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
35 35 import org.thingsboard.server.common.data.id.TenantId;
36 36 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
37 37 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
38   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  38 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
39 39 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
40 40 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
41 41 import org.thingsboard.server.common.data.kv.DataType;
42 42 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
43 43 import org.thingsboard.server.common.data.kv.KvEntry;
44 44 import org.thingsboard.server.common.data.kv.LongDataEntry;
  45 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
45 46 import org.thingsboard.server.common.data.kv.StringDataEntry;
46 47 import org.thingsboard.server.common.data.kv.TsKvEntry;
47   -import org.thingsboard.server.common.data.kv.TsKvQuery;
48 48 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
49 49 import org.thingsboard.server.common.msg.cluster.ServerAddress;
50 50 import org.thingsboard.server.dao.attributes.AttributesService;
... ... @@ -370,9 +370,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
370 370 e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
371 371 } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
372 372 long curTs = System.currentTimeMillis();
373   - List<TsKvQuery> queries = new ArrayList<>();
  373 + List<ReadTsKvQuery> queries = new ArrayList<>();
374 374 subscription.getKeyStates().entrySet().forEach(e -> {
375   - queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
  375 + queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
376 376 });
377 377
378 378 DonAsynchron.withCallback(tsService.findAll(entityId, queries),
... ...
... ... @@ -28,12 +28,22 @@ import org.springframework.util.StringUtils;
28 28 import org.thingsboard.server.common.data.DataConstants;
29 29 import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.id.EntityIdFactory;
31   -import org.thingsboard.server.common.data.kv.*;
  31 +import org.thingsboard.server.common.data.kv.Aggregation;
  32 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  33 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  34 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  35 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  36 +import org.thingsboard.server.common.data.kv.TsKvEntry;
32 37 import org.thingsboard.server.dao.attributes.AttributesService;
33 38 import org.thingsboard.server.dao.timeseries.TimeseriesService;
34 39 import org.thingsboard.server.service.security.AccessValidator;
35 40 import org.thingsboard.server.service.security.ValidationResult;
36   -import org.thingsboard.server.service.telemetry.cmd.*;
  41 +import org.thingsboard.server.service.telemetry.cmd.AttributesSubscriptionCmd;
  42 +import org.thingsboard.server.service.telemetry.cmd.GetHistoryCmd;
  43 +import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd;
  44 +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd;
  45 +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
  46 +import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd;
37 47 import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
38 48 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
39 49 import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
... ... @@ -43,7 +53,14 @@ import javax.annotation.Nullable;
43 53 import javax.annotation.PostConstruct;
44 54 import javax.annotation.PreDestroy;
45 55 import java.io.IOException;
46   -import java.util.*;
  56 +import java.util.ArrayList;
  57 +import java.util.Collections;
  58 +import java.util.HashMap;
  59 +import java.util.HashSet;
  60 +import java.util.List;
  61 +import java.util.Map;
  62 +import java.util.Optional;
  63 +import java.util.Set;
47 64 import java.util.concurrent.ConcurrentHashMap;
48 65 import java.util.concurrent.ConcurrentMap;
49 66 import java.util.concurrent.ExecutorService;
... ... @@ -234,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
234 251 }
235 252 EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
236 253 List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
237   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC"))
  254 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
238 255 .collect(Collectors.toList());
239 256
240 257 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
... ... @@ -320,8 +337,8 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
320 337 log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
321 338 startTs = cmd.getStartTs();
322 339 long endTs = cmd.getStartTs() + cmd.getTimeWindow();
323   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
324   - getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC")).collect(Collectors.toList());
  340 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),
  341 + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
325 342
326 343 final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
327 344 accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
... ...
... ... @@ -18,7 +18,7 @@ package org.thingsboard.server.common.data.kv;
18 18 import lombok.Data;
19 19
20 20 @Data
21   -public class BaseDeleteTsKvQuery extends BaseQuery implements DeleteTsKvQuery {
  21 +public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery {
22 22
23 23 private final Boolean rewriteLatestIfDeleted;
24 24
... ...
common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java renamed from common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java
... ... @@ -18,16 +18,28 @@ package org.thingsboard.server.common.data.kv;
18 18 import lombok.Data;
19 19
20 20 @Data
21   -public class BaseQuery implements Query {
  21 +public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
22 22
23   - private final String key;
24   - private final long startTs;
25   - private final long endTs;
  23 + private final long interval;
  24 + private final int limit;
  25 + private final Aggregation aggregation;
  26 + private final String orderBy;
26 27
27   - public BaseQuery(String key, long startTs, long endTs) {
28   - this.key = key;
29   - this.startTs = startTs;
30   - this.endTs = endTs;
  28 + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
  29 + this(key, startTs, endTs, interval, limit, aggregation, "DESC");
  30 + }
  31 +
  32 + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
  33 + String orderBy) {
  34 + super(key, startTs, endTs);
  35 + this.interval = interval;
  36 + this.limit = limit;
  37 + this.aggregation = aggregation;
  38 + this.orderBy = orderBy;
  39 + }
  40 +
  41 + public BaseReadTsKvQuery(String key, long startTs, long endTs) {
  42 + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
31 43 }
32 44
33 45 }
... ...
... ... @@ -18,24 +18,16 @@ package org.thingsboard.server.common.data.kv;
18 18 import lombok.Data;
19 19
20 20 @Data
21   -public class BaseTsKvQuery extends BaseQuery implements TsKvQuery {
  21 +public class BaseTsKvQuery implements TsKvQuery {
22 22
23   - private final long interval;
24   - private final int limit;
25   - private final Aggregation aggregation;
26   - private final String orderBy;
27   -
28   - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
29   - String orderBy) {
30   - super(key, startTs, endTs);
31   - this.interval = interval;
32   - this.limit = limit;
33   - this.aggregation = aggregation;
34   - this.orderBy = orderBy;
35   - }
  23 + private final String key;
  24 + private final long startTs;
  25 + private final long endTs;
36 26
37 27 public BaseTsKvQuery(String key, long startTs, long endTs) {
38   - this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
  28 + this.key = key;
  29 + this.startTs = startTs;
  30 + this.endTs = endTs;
39 31 }
40 32
41 33 }
... ...
... ... @@ -15,7 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.common.data.kv;
17 17
18   -public interface DeleteTsKvQuery extends Query {
  18 +public interface DeleteTsKvQuery extends TsKvQuery {
19 19
20 20 Boolean getRewriteLatestIfDeleted();
21 21
... ...
common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java renamed from common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java
... ... @@ -15,12 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.common.data.kv;
17 17
18   -public interface Query {
  18 +public interface ReadTsKvQuery extends TsKvQuery {
19 19
20   - String getKey();
  20 + long getInterval();
21 21
22   - long getStartTs();
  22 + int getLimit();
23 23
24   - long getEndTs();
  24 + Aggregation getAggregation();
  25 +
  26 + String getOrderBy();
25 27
26 28 }
... ...
... ... @@ -15,14 +15,12 @@
15 15 */
16 16 package org.thingsboard.server.common.data.kv;
17 17
18   -public interface TsKvQuery extends Query {
  18 +public interface TsKvQuery {
19 19
20   - long getInterval();
  20 + String getKey();
21 21
22   - int getLimit();
  22 + long getStartTs();
23 23
24   - Aggregation getAggregation();
25   -
26   - String getOrderBy();
  24 + long getEndTs();
27 25
28 26 }
... ...
... ... @@ -17,7 +17,11 @@ package org.thingsboard.server.dao.sql.timeseries;
17 17
18 18 import com.google.common.base.Function;
19 19 import com.google.common.collect.Lists;
20   -import com.google.common.util.concurrent.*;
  20 +import com.google.common.util.concurrent.Futures;
  21 +import com.google.common.util.concurrent.ListenableFuture;
  22 +import com.google.common.util.concurrent.ListeningExecutorService;
  23 +import com.google.common.util.concurrent.MoreExecutors;
  24 +import com.google.common.util.concurrent.SettableFuture;
21 25 import lombok.extern.slf4j.Slf4j;
22 26 import org.springframework.beans.factory.annotation.Autowired;
23 27 import org.springframework.beans.factory.annotation.Value;
... ... @@ -25,7 +29,12 @@ import org.springframework.data.domain.PageRequest;
25 29 import org.springframework.stereotype.Component;
26 30 import org.thingsboard.server.common.data.UUIDConverter;
27 31 import org.thingsboard.server.common.data.id.EntityId;
28   -import org.thingsboard.server.common.data.kv.*;
  32 +import org.thingsboard.server.common.data.kv.Aggregation;
  33 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  34 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  35 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  36 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  37 +import org.thingsboard.server.common.data.kv.TsKvEntry;
29 38 import org.thingsboard.server.dao.DaoUtil;
30 39 import org.thingsboard.server.dao.model.sql.TsKvEntity;
31 40 import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
... ... @@ -94,7 +103,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
94 103 }
95 104
96 105 @Override
97   - public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
  106 + public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
98 107 List<ListenableFuture<List<TsKvEntry>>> futures = queries
99 108 .stream()
100 109 .map(query -> findAllAsync(entityId, query))
... ... @@ -113,7 +122,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
113 122 }, service);
114 123 }
115 124
116   - private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
  125 + private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
117 126 if (query.getAggregation() == Aggregation.NONE) {
118 127 return findAllAsyncWithLimit(entityId, query);
119 128 } else {
... ... @@ -220,7 +229,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
220 229 });
221 230 }
222 231
223   - private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
  232 + private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
224 233 return Futures.immediateFuture(
225 234 DaoUtil.convertDataList(
226 235 tsKvRepository.findAllWithLimit(
... ...
... ... @@ -23,8 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired;
23 23 import org.springframework.stereotype.Service;
24 24 import org.thingsboard.server.common.data.id.EntityId;
25 25 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  26 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
26 27 import org.thingsboard.server.common.data.kv.TsKvEntry;
27   -import org.thingsboard.server.common.data.kv.TsKvQuery;
28 28 import org.thingsboard.server.dao.exception.IncorrectParameterException;
29 29 import org.thingsboard.server.dao.service.Validator;
30 30
... ... @@ -47,7 +47,7 @@ public class BaseTimeseriesService implements TimeseriesService {
47 47 private TimeseriesDao timeseriesDao;
48 48
49 49 @Override
50   - public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) {
  50 + public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries) {
51 51 validate(entityId);
52 52 queries.forEach(BaseTimeseriesService::validate);
53 53 return timeseriesDao.findAllAsync(entityId, queries);
... ... @@ -118,13 +118,13 @@ public class BaseTimeseriesService implements TimeseriesService {
118 118 Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
119 119 }
120 120
121   - private static void validate(TsKvQuery query) {
  121 + private static void validate(ReadTsKvQuery query) {
122 122 if (query == null) {
123   - throw new IncorrectParameterException("TsKvQuery can't be null");
  123 + throw new IncorrectParameterException("ReadTsKvQuery can't be null");
124 124 } else if (isBlank(query.getKey())) {
125   - throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
  125 + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Key can't be empty");
126 126 } else if (query.getAggregation() == null) {
127   - throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
  127 + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
128 128 }
129 129 }
130 130
... ...
... ... @@ -15,7 +15,12 @@
15 15 */
16 16 package org.thingsboard.server.dao.timeseries;
17 17
18   -import com.datastax.driver.core.*;
  18 +import com.datastax.driver.core.BoundStatement;
  19 +import com.datastax.driver.core.PreparedStatement;
  20 +import com.datastax.driver.core.ResultSet;
  21 +import com.datastax.driver.core.ResultSetFuture;
  22 +import com.datastax.driver.core.Row;
  23 +import com.datastax.driver.core.Statement;
19 24 import com.datastax.driver.core.querybuilder.QueryBuilder;
20 25 import com.datastax.driver.core.querybuilder.Select;
21 26 import com.google.common.base.Function;
... ... @@ -29,8 +34,18 @@ import org.springframework.beans.factory.annotation.Value;
29 34 import org.springframework.core.env.Environment;
30 35 import org.springframework.stereotype.Component;
31 36 import org.thingsboard.server.common.data.id.EntityId;
32   -import org.thingsboard.server.common.data.kv.*;
  37 +import org.thingsboard.server.common.data.kv.Aggregation;
  38 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  39 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  40 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
33 41 import org.thingsboard.server.common.data.kv.DataType;
  42 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  43 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  44 +import org.thingsboard.server.common.data.kv.KvEntry;
  45 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  46 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
  47 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  48 +import org.thingsboard.server.common.data.kv.TsKvEntry;
34 49 import org.thingsboard.server.dao.model.ModelConstants;
35 50 import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
36 51 import org.thingsboard.server.dao.util.NoSqlDao;
... ... @@ -41,7 +56,11 @@ import javax.annotation.PreDestroy;
41 56 import java.time.Instant;
42 57 import java.time.LocalDateTime;
43 58 import java.time.ZoneOffset;
44   -import java.util.*;
  59 +import java.util.ArrayList;
  60 +import java.util.Arrays;
  61 +import java.util.Collections;
  62 +import java.util.List;
  63 +import java.util.Optional;
45 64 import java.util.stream.Collectors;
46 65
47 66 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
... ... @@ -111,7 +130,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
111 130 }
112 131
113 132 @Override
114   - public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
  133 + public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
115 134 List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityId, query)).collect(Collectors.toList());
116 135 return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
117 136 @Nullable
... ... @@ -128,7 +147,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
128 147 }
129 148
130 149
131   - private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
  150 + private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
132 151 if (query.getAggregation() == Aggregation.NONE) {
133 152 return findAllAsyncWithLimit(entityId, query);
134 153 } else {
... ... @@ -138,7 +157,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
138 157 while (stepTs < query.getEndTs()) {
139 158 long startTs = stepTs;
140 159 long endTs = stepTs + step;
141   - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
  160 + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
142 161 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
143 162 stepTs = endTs;
144 163 }
... ... @@ -157,7 +176,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
157 176 return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START);
158 177 }
159 178
160   - private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
  179 + private ListenableFuture<List<Long>> getPartitionsFuture(ReadTsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
161 180 if (isFixedPartitioning()) { //no need to fetch partitions from DB
162 181 return Futures.immediateFuture(FIXED_PARTITION);
163 182 }
... ... @@ -165,7 +184,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
165 184 return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
166 185 }
167 186
168   - private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
  187 + private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
169 188 long minPartition = toPartitionTs(query.getStartTs());
170 189 long maxPartition = toPartitionTs(query.getEndTs());
171 190 final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
... ... @@ -221,7 +240,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
221 240 }
222 241 }
223 242
224   - private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, TsKvQuery query, long minPartition, long maxPartition) {
  243 + private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, ReadTsKvQuery query, long minPartition, long maxPartition) {
225 244 final Aggregation aggregation = query.getAggregation();
226 245 final String key = query.getKey();
227 246 final long startTs = query.getStartTs();
... ... @@ -448,7 +467,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
448 467 private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
449 468 long startTs = 0;
450 469 long endTs = query.getStartTs() - 1;
451   - TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
  470 + ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
452 471 Aggregation.NONE, DESC_ORDER);
453 472 ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
454 473
... ...
... ... @@ -16,7 +16,7 @@
16 16 package org.thingsboard.server.dao.timeseries;
17 17
18 18 import lombok.Getter;
19   -import org.thingsboard.server.common.data.kv.Query;
  19 +import org.thingsboard.server.common.data.kv.TsKvQuery;
20 20
21 21 import java.util.List;
22 22 import java.util.UUID;
... ... @@ -37,7 +37,7 @@ public class QueryCursor {
37 37 final List<Long> partitions;
38 38 private int partitionIndex;
39 39
40   - public QueryCursor(String entityType, UUID entityId, Query baseQuery, List<Long> partitions) {
  40 + public QueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
41 41 this.entityType = entityType;
42 42 this.entityId = entityId;
43 43 this.key = baseQuery.getKey();
... ...
... ... @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.timeseries;
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
20 20 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  21 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
21 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
22   -import org.thingsboard.server.common.data.kv.TsKvQuery;
23 23
24 24 import java.util.List;
25 25
... ... @@ -28,7 +28,7 @@ import java.util.List;
28 28 */
29 29 public interface TimeseriesDao {
30 30
31   - ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
  31 + ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries);
32 32
33 33 ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
34 34
... ...
... ... @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.timeseries;
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
20 20 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  21 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
21 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
22   -import org.thingsboard.server.common.data.kv.TsKvQuery;
23 23
24 24 import java.util.Collection;
25 25 import java.util.List;
... ... @@ -29,7 +29,7 @@ import java.util.List;
29 29 */
30 30 public interface TimeseriesService {
31 31
32   - ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
  32 + ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries);
33 33
34 34 ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
35 35
... ...
... ... @@ -16,8 +16,8 @@
16 16 package org.thingsboard.server.dao.timeseries;
17 17
18 18 import lombok.Getter;
  19 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
19 20 import org.thingsboard.server.common.data.kv.TsKvEntry;
20   -import org.thingsboard.server.common.data.kv.TsKvQuery;
21 21
22 22 import java.util.ArrayList;
23 23 import java.util.List;
... ... @@ -38,7 +38,7 @@ public class TsKvQueryCursor extends QueryCursor {
38 38 private int partitionIndex;
39 39 private int currentLimit;
40 40
41   - public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
  41 + public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List<Long> partitions) {
42 42 super(entityType, entityId, baseQuery, partitions);
43 43 this.orderBy = baseQuery.getOrderBy();
44 44 this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
... ...
... ... @@ -20,7 +20,16 @@ import lombok.extern.slf4j.Slf4j;
20 20 import org.junit.Assert;
21 21 import org.junit.Test;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23   -import org.thingsboard.server.common.data.kv.*;
  23 +import org.thingsboard.server.common.data.kv.Aggregation;
  24 +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
  25 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
  26 +import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  27 +import org.thingsboard.server.common.data.kv.BooleanDataEntry;
  28 +import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  29 +import org.thingsboard.server.common.data.kv.KvEntry;
  30 +import org.thingsboard.server.common.data.kv.LongDataEntry;
  31 +import org.thingsboard.server.common.data.kv.StringDataEntry;
  32 +import org.thingsboard.server.common.data.kv.TsKvEntry;
24 33 import org.thingsboard.server.dao.service.AbstractServiceTest;
25 34
26 35 import java.util.ArrayList;
... ... @@ -106,7 +115,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
106 115 new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get();
107 116
108 117 List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
109   - new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER))).get();
  118 + new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
110 119 Assert.assertEquals(1, list.size());
111 120
112 121 List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
... ... @@ -127,8 +136,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
127 136 entries.add(save(deviceId, 45000, 500));
128 137 entries.add(save(deviceId, 55000, 600));
129 138
130   - List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
131   - 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
  139 + List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  140 + 60000, 20000, 3, Aggregation.NONE))).get();
132 141 assertEquals(3, list.size());
133 142 assertEquals(55000, list.get(0).getTs());
134 143 assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
... ... @@ -139,8 +148,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
139 148 assertEquals(35000, list.get(2).getTs());
140 149 assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
141 150
142   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
143   - 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
  151 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  152 + 60000, 20000, 3, Aggregation.AVG))).get();
144 153 assertEquals(3, list.size());
145 154 assertEquals(10000, list.get(0).getTs());
146 155 assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
... ... @@ -151,8 +160,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
151 160 assertEquals(50000, list.get(2).getTs());
152 161 assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
153 162
154   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
155   - 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
  163 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  164 + 60000, 20000, 3, Aggregation.SUM))).get();
156 165
157 166 assertEquals(3, list.size());
158 167 assertEquals(10000, list.get(0).getTs());
... ... @@ -164,8 +173,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
164 173 assertEquals(50000, list.get(2).getTs());
165 174 assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
166 175
167   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
168   - 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
  176 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  177 + 60000, 20000, 3, Aggregation.MIN))).get();
169 178
170 179 assertEquals(3, list.size());
171 180 assertEquals(10000, list.get(0).getTs());
... ... @@ -177,8 +186,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
177 186 assertEquals(50000, list.get(2).getTs());
178 187 assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
179 188
180   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
181   - 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
  189 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  190 + 60000, 20000, 3, Aggregation.MAX))).get();
182 191
183 192 assertEquals(3, list.size());
184 193 assertEquals(10000, list.get(0).getTs());
... ... @@ -190,8 +199,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
190 199 assertEquals(50000, list.get(2).getTs());
191 200 assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
192 201
193   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
194   - 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
  202 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  203 + 60000, 20000, 3, Aggregation.COUNT))).get();
195 204
196 205 assertEquals(3, list.size());
197 206 assertEquals(10000, list.get(0).getTs());
... ...