Commit c1f3abf9de7e6dfb1d1199104f053bda9165e74b

Authored by Dima Landiak
1 parent 753ff4e3

refactoring

... ... @@ -28,13 +28,7 @@ import org.springframework.http.HttpStatus;
28 28 import org.springframework.http.ResponseEntity;
29 29 import org.springframework.security.access.prepost.PreAuthorize;
30 30 import org.springframework.util.StringUtils;
31   -import org.springframework.web.bind.annotation.PathVariable;
32   -import org.springframework.web.bind.annotation.RequestBody;
33   -import org.springframework.web.bind.annotation.RequestMapping;
34   -import org.springframework.web.bind.annotation.RequestMethod;
35   -import org.springframework.web.bind.annotation.RequestParam;
36   -import org.springframework.web.bind.annotation.ResponseBody;
37   -import org.springframework.web.bind.annotation.RestController;
  31 +import org.springframework.web.bind.annotation.*;
38 32 import org.springframework.web.context.request.async.DeferredResult;
39 33 import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
40 34 import org.thingsboard.server.common.data.DataConstants;
... ... @@ -45,19 +39,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
45 39 import org.thingsboard.server.common.data.id.EntityId;
46 40 import org.thingsboard.server.common.data.id.EntityIdFactory;
47 41 import org.thingsboard.server.common.data.id.UUIDBased;
48   -import org.thingsboard.server.common.data.kv.Aggregation;
49   -import org.thingsboard.server.common.data.kv.AttributeKey;
50   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
51   -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
52   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
53   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
54   -import org.thingsboard.server.common.data.kv.BooleanDataEntry;
55   -import org.thingsboard.server.common.data.kv.DoubleDataEntry;
56   -import org.thingsboard.server.common.data.kv.KvEntry;
57   -import org.thingsboard.server.common.data.kv.LongDataEntry;
58   -import org.thingsboard.server.common.data.kv.StringDataEntry;
59   -import org.thingsboard.server.common.data.kv.TsKvEntry;
60   -import org.thingsboard.server.common.data.kv.TsKvQuery;
  42 +import org.thingsboard.server.common.data.kv.*;
61 43 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
62 44 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
63 45 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
... ... @@ -74,14 +56,7 @@ import org.thingsboard.server.service.telemetry.exception.UncheckedApiException;
74 56 import javax.annotation.Nullable;
75 57 import javax.annotation.PostConstruct;
76 58 import javax.annotation.PreDestroy;
77   -import java.util.ArrayList;
78   -import java.util.Arrays;
79   -import java.util.HashSet;
80   -import java.util.LinkedHashMap;
81   -import java.util.List;
82   -import java.util.Map;
83   -import java.util.Set;
84   -import java.util.UUID;
  59 +import java.util.*;
85 60 import java.util.concurrent.ExecutorService;
86 61 import java.util.concurrent.Executors;
87 62 import java.util.stream.Collectors;
... ... @@ -201,7 +176,7 @@ public class TelemetryController extends BaseController {
201 176 (result, entityId) -> {
202 177 // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
203 178 Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
204   - List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC", false))
  179 + List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC"))
205 180 .collect(Collectors.toList());
206 181
207 182 Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
... ...
... ... @@ -28,22 +28,12 @@ import org.springframework.util.StringUtils;
28 28 import org.thingsboard.server.common.data.DataConstants;
29 29 import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.id.EntityIdFactory;
31   -import org.thingsboard.server.common.data.kv.Aggregation;
32   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
33   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
34   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
35   -import org.thingsboard.server.common.data.kv.TsKvEntry;
36   -import org.thingsboard.server.common.data.kv.TsKvQuery;
  31 +import org.thingsboard.server.common.data.kv.*;
37 32 import org.thingsboard.server.dao.attributes.AttributesService;
38 33 import org.thingsboard.server.dao.timeseries.TimeseriesService;
39 34 import org.thingsboard.server.service.security.AccessValidator;
40 35 import org.thingsboard.server.service.security.ValidationResult;
41   -import org.thingsboard.server.service.telemetry.cmd.AttributesSubscriptionCmd;
42   -import org.thingsboard.server.service.telemetry.cmd.GetHistoryCmd;
43   -import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd;
44   -import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd;
45   -import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
46   -import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd;
  36 +import org.thingsboard.server.service.telemetry.cmd.*;
47 37 import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
48 38 import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
49 39 import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
... ... @@ -53,14 +43,7 @@ import javax.annotation.Nullable;
53 43 import javax.annotation.PostConstruct;
54 44 import javax.annotation.PreDestroy;
55 45 import java.io.IOException;
56   -import java.util.ArrayList;
57   -import java.util.Collections;
58   -import java.util.HashMap;
59   -import java.util.HashSet;
60   -import java.util.List;
61   -import java.util.Map;
62   -import java.util.Optional;
63   -import java.util.Set;
  46 +import java.util.*;
64 47 import java.util.concurrent.ConcurrentHashMap;
65 48 import java.util.concurrent.ConcurrentMap;
66 49 import java.util.concurrent.ExecutorService;
... ... @@ -251,7 +234,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
251 234 }
252 235 EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
253 236 List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
254   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false))
  237 + List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC"))
255 238 .collect(Collectors.toList());
256 239
257 240 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
... ... @@ -338,7 +321,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
338 321 startTs = cmd.getStartTs();
339 322 long endTs = cmd.getStartTs() + cmd.getTimeWindow();
340 323 List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
341   - getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false)).collect(Collectors.toList());
  324 + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC")).collect(Collectors.toList());
342 325
343 326 final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
344 327 accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +import lombok.Data;
  19 +
  20 +@Data
  21 +public class BaseDeleteTsKvQuery extends BaseQuery implements DeleteTsKvQuery {
  22 +
  23 + private final Boolean rewriteLatestIfDeleted;
  24 +
  25 + public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) {
  26 + super(key, startTs, endTs);
  27 + this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
  28 + }
  29 +
  30 + public BaseDeleteTsKvQuery(String key, long startTs, long endTs) {
  31 + this(key, startTs, endTs, false);
  32 + }
  33 +
  34 +
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +import lombok.Data;
  19 +
  20 +@Data
  21 +public class BaseQuery implements Query {
  22 +
  23 + private final String key;
  24 + private final long startTs;
  25 + private final long endTs;
  26 +
  27 + public BaseQuery(String key, long startTs, long endTs) {
  28 + this.key = key;
  29 + this.startTs = startTs;
  30 + this.endTs = endTs;
  31 + }
  32 +
  33 +}
... ...
... ... @@ -18,31 +18,24 @@ package org.thingsboard.server.common.data.kv;
18 18 import lombok.Data;
19 19
20 20 @Data
21   -public class BaseTsKvQuery implements TsKvQuery {
  21 +public class BaseTsKvQuery extends BaseQuery implements TsKvQuery {
22 22
23   - private final String key;
24   - private final long startTs;
25   - private final long endTs;
26 23 private final long interval;
27 24 private final int limit;
28 25 private final Aggregation aggregation;
29 26 private final String orderBy;
30   - private final Boolean rewriteLatestIfDeleted;
31 27
32   - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
33   - boolean rewriteLatestIfDeleted) {
34   - this.key = key;
35   - this.startTs = startTs;
36   - this.endTs = endTs;
  28 + public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
  29 + String orderBy) {
  30 + super(key, startTs, endTs);
37 31 this.interval = interval;
38 32 this.limit = limit;
39 33 this.aggregation = aggregation;
40 34 this.orderBy = orderBy;
41   - this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
42 35 }
43 36
44 37 public BaseTsKvQuery(String key, long startTs, long endTs) {
45   - this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
  38 + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
46 39 }
47 40
48 41 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +public interface DeleteTsKvQuery extends Query {
  19 +
  20 + Boolean getRewriteLatestIfDeleted();
  21 +
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +public interface Query {
  19 +
  20 + String getKey();
  21 +
  22 + long getStartTs();
  23 +
  24 + long getEndTs();
  25 +
  26 +}
... ...
... ... @@ -15,13 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.common.data.kv;
17 17
18   -public interface TsKvQuery {
19   -
20   - String getKey();
21   -
22   - long getStartTs();
23   -
24   - long getEndTs();
  18 +public interface TsKvQuery extends Query {
25 19
26 20 long getInterval();
27 21
... ... @@ -31,5 +25,4 @@ public interface TsKvQuery {
31 25
32 26 String getOrderBy();
33 27
34   - Boolean getRewriteLatestIfDeleted();
35 28 }
... ...
... ... @@ -17,11 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries;
17 17
18 18 import com.google.common.base.Function;
19 19 import com.google.common.collect.Lists;
20   -import com.google.common.util.concurrent.Futures;
21   -import com.google.common.util.concurrent.ListenableFuture;
22   -import com.google.common.util.concurrent.ListeningExecutorService;
23   -import com.google.common.util.concurrent.MoreExecutors;
24   -import com.google.common.util.concurrent.SettableFuture;
  20 +import com.google.common.util.concurrent.*;
25 21 import lombok.extern.slf4j.Slf4j;
26 22 import org.springframework.beans.factory.annotation.Autowired;
27 23 import org.springframework.beans.factory.annotation.Value;
... ... @@ -29,11 +25,7 @@ import org.springframework.data.domain.PageRequest;
29 25 import org.springframework.stereotype.Component;
30 26 import org.thingsboard.server.common.data.UUIDConverter;
31 27 import org.thingsboard.server.common.data.id.EntityId;
32   -import org.thingsboard.server.common.data.kv.Aggregation;
33   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
34   -import org.thingsboard.server.common.data.kv.StringDataEntry;
35   -import org.thingsboard.server.common.data.kv.TsKvEntry;
36   -import org.thingsboard.server.common.data.kv.TsKvQuery;
  28 +import org.thingsboard.server.common.data.kv.*;
37 29 import org.thingsboard.server.dao.DaoUtil;
38 30 import org.thingsboard.server.dao.model.sql.TsKvEntity;
39 31 import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
... ... @@ -307,7 +299,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
307 299 }
308 300
309 301 @Override
310   - public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
  302 + public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
311 303 return service.submit(() -> {
312 304 tsKvRepository.delete(
313 305 fromTimeUUID(entityId.getId()),
... ... @@ -320,7 +312,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
320 312 }
321 313
322 314 @Override
323   - public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
  315 + public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
324 316 TsKvLatestEntity latestEntity = new TsKvLatestEntity();
325 317 latestEntity.setEntityType(entityId.getEntityType());
326 318 latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
... ... @@ -332,7 +324,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
332 324 }
333 325
334 326 @Override
335   - public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
  327 + public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
336 328 return service.submit(() -> null);
337 329 }
338 330
... ...
... ... @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
22 22 import org.springframework.beans.factory.annotation.Autowired;
23 23 import org.springframework.stereotype.Service;
24 24 import org.thingsboard.server.common.data.id.EntityId;
  25 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
25 26 import org.thingsboard.server.common.data.kv.TsKvEntry;
26 27 import org.thingsboard.server.common.data.kv.TsKvQuery;
27 28 import org.thingsboard.server.dao.exception.IncorrectParameterException;
... ... @@ -48,7 +49,7 @@ public class BaseTimeseriesService implements TimeseriesService {
48 49 @Override
49 50 public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) {
50 51 validate(entityId);
51   - queries.forEach(query -> validate(query));
  52 + queries.forEach(BaseTimeseriesService::validate);
52 53 return timeseriesDao.findAllAsync(entityId, queries);
53 54 }
54 55
... ... @@ -97,17 +98,17 @@ public class BaseTimeseriesService implements TimeseriesService {
97 98 }
98 99
99 100 @Override
100   - public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
  101 + public ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
101 102 validate(entityId);
102   - tsKvQueries.forEach(BaseTimeseriesService::validate);
103   - List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
104   - for (TsKvQuery tsKvQuery : tsKvQueries) {
  103 + deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
  104 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
  105 + for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
105 106 deleteAndRegisterFutures(futures, entityId, tsKvQuery);
106 107 }
107 108 return Futures.allAsList(futures);
108 109 }
109 110
110   - private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
  111 + private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
111 112 futures.add(timeseriesDao.remove(entityId, query));
112 113 futures.add(timeseriesDao.removeLatest(entityId, query));
113 114 futures.add(timeseriesDao.removePartition(entityId, query));
... ... @@ -126,4 +127,12 @@ public class BaseTimeseriesService implements TimeseriesService {
126 127 throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
127 128 }
128 129 }
  130 +
  131 + private static void validate(DeleteTsKvQuery query) {
  132 + if (query == null) {
  133 + throw new IncorrectParameterException("DeleteTsKvQuery can't be null");
  134 + } else if (isBlank(query.getKey())) {
  135 + throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
  136 + }
  137 + }
129 138 }
... ...
... ... @@ -138,7 +138,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
138 138 while (stepTs < query.getEndTs()) {
139 139 long startTs = stepTs;
140 140 long endTs = stepTs + step;
141   - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
  141 + TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
142 142 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
143 143 stepTs = endTs;
144 144 }
... ... @@ -346,7 +346,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
346 346 }
347 347
348 348 @Override
349   - public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
  349 + public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
350 350 long minPartition = toPartitionTs(query.getStartTs());
351 351 long maxPartition = toPartitionTs(query.getEndTs());
352 352
... ... @@ -358,7 +358,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
358 358 Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
359 359 @Override
360 360 public void onSuccess(@Nullable List<Long> partitions) {
361   - TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
  361 + QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
362 362 deleteAsync(cursor, resultFuture);
363 363 }
364 364
... ... @@ -370,7 +370,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
370 370 return resultFuture;
371 371 }
372 372
373   - private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  373 + private void deleteAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
374 374 if (!cursor.hasNextPartition()) {
375 375 resultFuture.set(null);
376 376 } else {
... ... @@ -411,7 +411,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
411 411 }
412 412
413 413 @Override
414   - public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
  414 + public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
415 415 ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
416 416
417 417 ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
... ... @@ -445,11 +445,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
445 445 return removedLatestFuture;
446 446 }
447 447
448   - private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
  448 + private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
449 449 long startTs = 0;
450 450 long endTs = query.getStartTs() - 1;
451 451 TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
452   - Aggregation.NONE, DESC_ORDER, false);
  452 + Aggregation.NONE, DESC_ORDER);
453 453 ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
454 454
455 455 return Futures.transformAsync(future, entryList -> {
... ... @@ -472,7 +472,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
472 472 }
473 473
474 474 @Override
475   - public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
  475 + public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
476 476 long minPartition = toPartitionTs(query.getStartTs());
477 477 long maxPartition = toPartitionTs(query.getEndTs());
478 478 if (minPartition == maxPartition) {
... ... @@ -494,7 +494,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
494 494 for (int i = index; i < partitions.size() - 1; i++) {
495 495 partitionsToDelete.add(partitions.get(i));
496 496 }
497   - TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
  497 + QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
498 498 deletePartitionAsync(cursor, resultFuture);
499 499 }
500 500
... ... @@ -507,7 +507,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
507 507 }
508 508 }
509 509
510   - private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  510 + private void deletePartitionAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
511 511 if (!cursor.hasNextPartition()) {
512 512 resultFuture.set(null);
513 513 } else {
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.data.kv.Query;
  20 +
  21 +import java.util.List;
  22 +import java.util.UUID;
  23 +
  24 +public class QueryCursor {
  25 +
  26 + @Getter
  27 + protected final String entityType;
  28 + @Getter
  29 + protected final UUID entityId;
  30 + @Getter
  31 + protected final String key;
  32 + @Getter
  33 + private final long startTs;
  34 + @Getter
  35 + private final long endTs;
  36 +
  37 + final List<Long> partitions;
  38 + private int partitionIndex;
  39 +
  40 + public QueryCursor(String entityType, UUID entityId, Query baseQuery, List<Long> partitions) {
  41 + this.entityType = entityType;
  42 + this.entityId = entityId;
  43 + this.key = baseQuery.getKey();
  44 + this.startTs = baseQuery.getStartTs();
  45 + this.endTs = baseQuery.getEndTs();
  46 + this.partitions = partitions;
  47 + this.partitionIndex = partitions.size() - 1;
  48 + }
  49 +
  50 + public boolean hasNextPartition() {
  51 + return partitionIndex >= 0;
  52 + }
  53 +
  54 + public long getNextPartition() {
  55 + long partition = partitions.get(partitionIndex);
  56 + partitionIndex--;
  57 + return partition;
  58 + }
  59 +
  60 +}
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
20 21 import org.thingsboard.server.common.data.kv.TsKvEntry;
21 22 import org.thingsboard.server.common.data.kv.TsKvQuery;
22 23
... ... @@ -39,9 +40,9 @@ public interface TimeseriesDao {
39 40
40 41 ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
41 42
42   - ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
  43 + ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query);
43 44
44   - ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
  45 + ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query);
45 46
46   - ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query);
  47 + ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query);
47 48 }
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.timeseries;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
20 21 import org.thingsboard.server.common.data.kv.TsKvEntry;
21 22 import org.thingsboard.server.common.data.kv.TsKvQuery;
22 23
... ... @@ -38,5 +39,5 @@ public interface TimeseriesService {
38 39
39 40 ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
40 41
41   - ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
  42 + ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> queries);
42 43 }
... ...
... ... @@ -28,18 +28,8 @@ import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.D
28 28 /**
29 29 * Created by ashvayka on 21.02.17.
30 30 */
31   -public class TsKvQueryCursor {
32   - @Getter
33   - private final String entityType;
34   - @Getter
35   - private final UUID entityId;
36   - @Getter
37   - private final String key;
38   - @Getter
39   - private final long startTs;
40   - @Getter
41   - private final long endTs;
42   - private final List<Long> partitions;
  31 +public class TsKvQueryCursor extends QueryCursor {
  32 +
43 33 @Getter
44 34 private final List<TsKvEntry> data;
45 35 @Getter
... ... @@ -49,18 +39,14 @@ public class TsKvQueryCursor {
49 39 private int currentLimit;
50 40
51 41 public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
52   - this.entityType = entityType;
53   - this.entityId = entityId;
54   - this.key = baseQuery.getKey();
55   - this.startTs = baseQuery.getStartTs();
56   - this.endTs = baseQuery.getEndTs();
57   - this.partitions = partitions;
  42 + super(entityType, entityId, baseQuery, partitions);
58 43 this.orderBy = baseQuery.getOrderBy();
59 44 this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
60 45 this.data = new ArrayList<>();
61 46 this.currentLimit = baseQuery.getLimit();
62 47 }
63 48
  49 + @Override
64 50 public boolean hasNextPartition() {
65 51 return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
66 52 }
... ... @@ -69,6 +55,7 @@ public class TsKvQueryCursor {
69 55 return currentLimit <= 0;
70 56 }
71 57
  58 + @Override
72 59 public long getNextPartition() {
73 60 long partition = partitions.get(partitionIndex);
74 61 if (isDesc()) {
... ...
... ... @@ -20,15 +20,7 @@ import lombok.extern.slf4j.Slf4j;
20 20 import org.junit.Assert;
21 21 import org.junit.Test;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23   -import org.thingsboard.server.common.data.kv.Aggregation;
24   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
25   -import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
26   -import org.thingsboard.server.common.data.kv.BooleanDataEntry;
27   -import org.thingsboard.server.common.data.kv.DoubleDataEntry;
28   -import org.thingsboard.server.common.data.kv.KvEntry;
29   -import org.thingsboard.server.common.data.kv.LongDataEntry;
30   -import org.thingsboard.server.common.data.kv.StringDataEntry;
31   -import org.thingsboard.server.common.data.kv.TsKvEntry;
  23 +import org.thingsboard.server.common.data.kv.*;
32 24 import org.thingsboard.server.dao.service.AbstractServiceTest;
33 25
34 26 import java.util.ArrayList;
... ... @@ -111,12 +103,10 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
111 103 saveEntries(deviceId, 40000);
112 104
113 105 tsService.remove(deviceId, Collections.singletonList(
114   - new BaseTsKvQuery(STRING_KEY, 15000, 45000, 10000, 0, Aggregation.NONE, DESC_ORDER,
115   - false))).get();
  106 + new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get();
116 107
117 108 List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
118   - new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER,
119   - false))).get();
  109 + new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER))).get();
120 110 Assert.assertEquals(1, list.size());
121 111
122 112 List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
... ... @@ -138,7 +128,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
138 128 entries.add(save(deviceId, 55000, 600));
139 129
140 130 List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
141   - 60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
  131 + 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
142 132 assertEquals(3, list.size());
143 133 assertEquals(55000, list.get(0).getTs());
144 134 assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
... ... @@ -150,7 +140,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
150 140 assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
151 141
152 142 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
153   - 60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
  143 + 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
154 144 assertEquals(3, list.size());
155 145 assertEquals(10000, list.get(0).getTs());
156 146 assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
... ... @@ -162,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
162 152 assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
163 153
164 154 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
165   - 60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
  155 + 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
166 156
167 157 assertEquals(3, list.size());
168 158 assertEquals(10000, list.get(0).getTs());
... ... @@ -175,7 +165,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
175 165 assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
176 166
177 167 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
178   - 60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
  168 + 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
179 169
180 170 assertEquals(3, list.size());
181 171 assertEquals(10000, list.get(0).getTs());
... ... @@ -188,7 +178,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
188 178 assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
189 179
190 180 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
191   - 60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
  181 + 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
192 182
193 183 assertEquals(3, list.size());
194 184 assertEquals(10000, list.get(0).getTs());
... ... @@ -201,7 +191,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
201 191 assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
202 192
203 193 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
204   - 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
  194 + 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
205 195
206 196 assertEquals(3, list.size());
207 197 assertEquals(10000, list.get(0).getTs());
... ...