Commit a12a1ebea2bceacb4c75837e3cd0218d1ebd915b

Authored by VoBa
Committed by GitHub
2 parents a1abbd23 29fc2386

Merge pull request #1055 from dmytro-landiak/master

OrderBy Query & Delete timeseries
Showing 19 changed files with 620 additions and 130 deletions
... ... @@ -76,7 +76,7 @@ public class DeviceController extends BaseController {
76 76 device.setTenantId(getCurrentUser().getTenantId());
77 77 if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
78 78 if (device.getId() == null || device.getId().isNullUid() ||
79   - device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
  79 + device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
80 80 throw new ThingsboardException("You don't have permission to perform this operation!",
81 81 ThingsboardErrorCode.PERMISSION_DENIED);
82 82 } else {
... ...
... ... @@ -49,15 +49,15 @@ import org.thingsboard.server.common.data.kv.Aggregation;
49 49 import org.thingsboard.server.common.data.kv.AttributeKey;
50 50 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
51 51 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
52   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  52 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
53 53 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
54 54 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
55 55 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
56 56 import org.thingsboard.server.common.data.kv.KvEntry;
57 57 import org.thingsboard.server.common.data.kv.LongDataEntry;
  58 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
58 59 import org.thingsboard.server.common.data.kv.StringDataEntry;
59 60 import org.thingsboard.server.common.data.kv.TsKvEntry;
60   -import org.thingsboard.server.common.data.kv.TsKvQuery;
61 61 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
62 62 import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
63 63 import org.thingsboard.server.common.transport.adaptor.JsonConverter;
... ... @@ -81,7 +81,6 @@ import java.util.LinkedHashMap;
81 81 import java.util.List;
82 82 import java.util.Map;
83 83 import java.util.Set;
84   -import java.util.UUID;
85 84 import java.util.concurrent.ExecutorService;
86 85 import java.util.concurrent.Executors;
87 86 import java.util.stream.Collectors;
... ... @@ -201,7 +200,7 @@ public class TelemetryController extends BaseController {
201 200 (result, entityId) -> {
202 201 // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
203 202 Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
204   - List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
  203 + List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg))
205 204 .collect(Collectors.toList());
206 205
207 206 Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
... ...
... ... @@ -35,16 +35,16 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
35 35 import org.thingsboard.server.common.data.id.TenantId;
36 36 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
37 37 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
38   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  38 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
39 39 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
40 40 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
41 41 import org.thingsboard.server.common.data.kv.DataType;
42 42 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
43 43 import org.thingsboard.server.common.data.kv.KvEntry;
44 44 import org.thingsboard.server.common.data.kv.LongDataEntry;
  45 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
45 46 import org.thingsboard.server.common.data.kv.StringDataEntry;
46 47 import org.thingsboard.server.common.data.kv.TsKvEntry;
47   -import org.thingsboard.server.common.data.kv.TsKvQuery;
48 48 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
49 49 import org.thingsboard.server.common.msg.cluster.ServerAddress;
50 50 import org.thingsboard.server.dao.attributes.AttributesService;
... ... @@ -370,9 +370,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
370 370 e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor);
371 371 } else if (subscription.getType() == TelemetryFeature.TIMESERIES) {
372 372 long curTs = System.currentTimeMillis();
373   - List<TsKvQuery> queries = new ArrayList<>();
  373 + List<ReadTsKvQuery> queries = new ArrayList<>();
374 374 subscription.getKeyStates().entrySet().forEach(e -> {
375   - queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
  375 + queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
376 376 });
377 377
378 378 DonAsynchron.withCallback(tsService.findAll(entityId, queries),
... ...
... ... @@ -30,10 +30,10 @@ import org.thingsboard.server.common.data.id.EntityId;
30 30 import org.thingsboard.server.common.data.id.EntityIdFactory;
31 31 import org.thingsboard.server.common.data.kv.Aggregation;
32 32 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
33   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  33 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
34 34 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  35 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
35 36 import org.thingsboard.server.common.data.kv.TsKvEntry;
36   -import org.thingsboard.server.common.data.kv.TsKvQuery;
37 37 import org.thingsboard.server.dao.attributes.AttributesService;
38 38 import org.thingsboard.server.dao.timeseries.TimeseriesService;
39 39 import org.thingsboard.server.service.security.AccessValidator;
... ... @@ -251,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
251 251 }
252 252 EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
253 253 List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
254   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
  254 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
255 255 .collect(Collectors.toList());
256 256
257 257 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
... ... @@ -337,7 +337,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
337 337 log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
338 338 startTs = cmd.getStartTs();
339 339 long endTs = cmd.getStartTs() + cmd.getTimeWindow();
340   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
  340 + List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(),
341 341 getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
342 342
343 343 final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +import lombok.Data;
  19 +
  20 +@Data
  21 +public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery {
  22 +
  23 + private final Boolean rewriteLatestIfDeleted;
  24 +
  25 + public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) {
  26 + super(key, startTs, endTs);
  27 + this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
  28 + }
  29 +
  30 + public BaseDeleteTsKvQuery(String key, long startTs, long endTs) {
  31 + this(key, startTs, endTs, false);
  32 + }
  33 +
  34 +
  35 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +import lombok.Data;
  19 +
  20 +@Data
  21 +public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
  22 +
  23 + private final long interval;
  24 + private final int limit;
  25 + private final Aggregation aggregation;
  26 + private final String orderBy;
  27 +
  28 + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
  29 + this(key, startTs, endTs, interval, limit, aggregation, "DESC");
  30 + }
  31 +
  32 + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
  33 + String orderBy) {
  34 + super(key, startTs, endTs);
  35 + this.interval = interval;
  36 + this.limit = limit;
  37 + this.aggregation = aggregation;
  38 + this.orderBy = orderBy;
  39 + }
  40 +
  41 + public BaseReadTsKvQuery(String key, long startTs, long endTs) {
  42 + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
  43 + }
  44 +
  45 +}
... ...
... ... @@ -23,21 +23,11 @@ public class BaseTsKvQuery implements TsKvQuery {
23 23 private final String key;
24 24 private final long startTs;
25 25 private final long endTs;
26   - private final long interval;
27   - private final int limit;
28   - private final Aggregation aggregation;
29 26
30   - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
  27 + public BaseTsKvQuery(String key, long startTs, long endTs) {
31 28 this.key = key;
32 29 this.startTs = startTs;
33 30 this.endTs = endTs;
34   - this.interval = interval;
35   - this.limit = limit;
36   - this.aggregation = aggregation;
37   - }
38   -
39   - public BaseTsKvQuery(String key, long startTs, long endTs) {
40   - this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG);
41 31 }
42 32
43 33 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +public interface DeleteTsKvQuery extends TsKvQuery {
  19 +
  20 + Boolean getRewriteLatestIfDeleted();
  21 +
  22 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.kv;
  17 +
  18 +public interface ReadTsKvQuery extends TsKvQuery {
  19 +
  20 + long getInterval();
  21 +
  22 + int getLimit();
  23 +
  24 + Aggregation getAggregation();
  25 +
  26 + String getOrderBy();
  27 +
  28 +}
... ...
... ... @@ -23,10 +23,4 @@ public interface TsKvQuery {
23 23
24 24 long getEndTs();
25 25
26   - long getInterval();
27   -
28   - int getLimit();
29   -
30   - Aggregation getAggregation();
31   -
32 26 }
... ...
... ... @@ -31,9 +31,10 @@ import org.thingsboard.server.common.data.UUIDConverter;
31 31 import org.thingsboard.server.common.data.id.EntityId;
32 32 import org.thingsboard.server.common.data.kv.Aggregation;
33 33 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
  34 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  35 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
34 36 import org.thingsboard.server.common.data.kv.StringDataEntry;
35 37 import org.thingsboard.server.common.data.kv.TsKvEntry;
36   -import org.thingsboard.server.common.data.kv.TsKvQuery;
37 38 import org.thingsboard.server.dao.DaoUtil;
38 39 import org.thingsboard.server.dao.model.sql.TsKvEntity;
39 40 import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
... ... @@ -102,7 +103,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
102 103 }
103 104
104 105 @Override
105   - public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
  106 + public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
106 107 List<ListenableFuture<List<TsKvEntry>>> futures = queries
107 108 .stream()
108 109 .map(query -> findAllAsync(entityId, query))
... ... @@ -121,7 +122,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
121 122 }, service);
122 123 }
123 124
124   - private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
  125 + private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
125 126 if (query.getAggregation() == Aggregation.NONE) {
126 127 return findAllAsyncWithLimit(entityId, query);
127 128 } else {
... ... @@ -228,7 +229,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
228 229 });
229 230 }
230 231
231   - private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
  232 + private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
232 233 return Futures.immediateFuture(
233 234 DaoUtil.convertDataList(
234 235 tsKvRepository.findAllWithLimit(
... ... @@ -306,6 +307,36 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
306 307 });
307 308 }
308 309
  310 + @Override
  311 + public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
  312 + return service.submit(() -> {
  313 + tsKvRepository.delete(
  314 + fromTimeUUID(entityId.getId()),
  315 + entityId.getEntityType(),
  316 + query.getKey(),
  317 + query.getStartTs(),
  318 + query.getEndTs());
  319 + return null;
  320 + });
  321 + }
  322 +
  323 + @Override
  324 + public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
  325 + TsKvLatestEntity latestEntity = new TsKvLatestEntity();
  326 + latestEntity.setEntityType(entityId.getEntityType());
  327 + latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
  328 + latestEntity.setKey(query.getKey());
  329 + return service.submit(() -> {
  330 + tsKvLatestRepository.delete(latestEntity);
  331 + return null;
  332 + });
  333 + }
  334 +
  335 + @Override
  336 + public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
  337 + return service.submit(() -> null);
  338 + }
  339 +
309 340 @PreDestroy
310 341 void onDestroy() {
311 342 if (insertService != null) {
... ...
... ... @@ -16,10 +16,12 @@
16 16 package org.thingsboard.server.dao.sql.timeseries;
17 17
18 18 import org.springframework.data.domain.Pageable;
  19 +import org.springframework.data.jpa.repository.Modifying;
19 20 import org.springframework.data.jpa.repository.Query;
20 21 import org.springframework.data.repository.CrudRepository;
21 22 import org.springframework.data.repository.query.Param;
22 23 import org.springframework.scheduling.annotation.Async;
  24 +import org.springframework.transaction.annotation.Transactional;
23 25 import org.thingsboard.server.common.data.EntityType;
24 26 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
25 27 import org.thingsboard.server.dao.model.sql.TsKvEntity;
... ... @@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
41 43 @Param("endTs") long endTs,
42 44 Pageable pageable);
43 45
  46 + @Transactional
  47 + @Modifying
  48 + @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
  49 + "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
  50 + "AND tskv.ts > :startTs AND tskv.ts < :endTs")
  51 + void delete(@Param("entityId") String entityId,
  52 + @Param("entityType") EntityType entityType,
  53 + @Param("entityKey") String key,
  54 + @Param("startTs") long startTs,
  55 + @Param("endTs") long endTs);
  56 +
44 57 @Async
45 58 @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
46 59 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
... ... @@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
56 69 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
57 70 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
58 71 CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
59   - @Param("entityType") EntityType entityType,
60   - @Param("entityKey") String entityKey,
61   - @Param("startTs") long startTs,
62   - @Param("endTs") long endTs);
  72 + @Param("entityType") EntityType entityType,
  73 + @Param("entityKey") String entityKey,
  74 + @Param("startTs") long startTs,
  75 + @Param("endTs") long endTs);
63 76
64 77 @Async
65 78 @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
66 79 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
67 80 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
68 81 CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
69   - @Param("entityType") EntityType entityType,
70   - @Param("entityKey") String entityKey,
71   - @Param("startTs") long startTs,
72   - @Param("endTs") long endTs);
  82 + @Param("entityType") EntityType entityType,
  83 + @Param("entityKey") String entityKey,
  84 + @Param("startTs") long startTs,
  85 + @Param("endTs") long endTs);
73 86
74 87 @Async
75 88 @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
76 89 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
77 90 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
78 91 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
79   - @Param("entityType") EntityType entityType,
80   - @Param("entityKey") String entityKey,
81   - @Param("startTs") long startTs,
82   - @Param("endTs") long endTs);
  92 + @Param("entityType") EntityType entityType,
  93 + @Param("entityKey") String entityKey,
  94 + @Param("startTs") long startTs,
  95 + @Param("endTs") long endTs);
83 96
84 97
85 98 @Async
... ... @@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
87 100 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
88 101 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
89 102 CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
90   - @Param("entityType") EntityType entityType,
91   - @Param("entityKey") String entityKey,
92   - @Param("startTs") long startTs,
93   - @Param("endTs") long endTs);
  103 + @Param("entityType") EntityType entityType,
  104 + @Param("entityKey") String entityKey,
  105 + @Param("startTs") long startTs,
  106 + @Param("endTs") long endTs);
94 107 }
... ...
... ... @@ -22,8 +22,9 @@ import lombok.extern.slf4j.Slf4j;
22 22 import org.springframework.beans.factory.annotation.Autowired;
23 23 import org.springframework.stereotype.Service;
24 24 import org.thingsboard.server.common.data.id.EntityId;
  25 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  26 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
25 27 import org.thingsboard.server.common.data.kv.TsKvEntry;
26   -import org.thingsboard.server.common.data.kv.TsKvQuery;
27 28 import org.thingsboard.server.dao.exception.IncorrectParameterException;
28 29 import org.thingsboard.server.dao.service.Validator;
29 30
... ... @@ -40,14 +41,15 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
40 41 public class BaseTimeseriesService implements TimeseriesService {
41 42
42 43 public static final int INSERTS_PER_ENTRY = 3;
  44 + public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
43 45
44 46 @Autowired
45 47 private TimeseriesDao timeseriesDao;
46 48
47 49 @Override
48   - public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries) {
  50 + public ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries) {
49 51 validate(entityId);
50   - queries.forEach(query -> validate(query));
  52 + queries.forEach(BaseTimeseriesService::validate);
51 53 return timeseriesDao.findAllAsync(entityId, queries);
52 54 }
53 55
... ... @@ -95,17 +97,42 @@ public class BaseTimeseriesService implements TimeseriesService {
95 97 futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
96 98 }
97 99
  100 + @Override
  101 + public ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
  102 + validate(entityId);
  103 + deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
  104 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
  105 + for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
  106 + deleteAndRegisterFutures(futures, entityId, tsKvQuery);
  107 + }
  108 + return Futures.allAsList(futures);
  109 + }
  110 +
  111 + private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
  112 + futures.add(timeseriesDao.remove(entityId, query));
  113 + futures.add(timeseriesDao.removeLatest(entityId, query));
  114 + futures.add(timeseriesDao.removePartition(entityId, query));
  115 + }
  116 +
98 117 private static void validate(EntityId entityId) {
99 118 Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
100 119 }
101 120
102   - private static void validate(TsKvQuery query) {
  121 + private static void validate(ReadTsKvQuery query) {
103 122 if (query == null) {
104   - throw new IncorrectParameterException("TsKvQuery can't be null");
  123 + throw new IncorrectParameterException("ReadTsKvQuery can't be null");
105 124 } else if (isBlank(query.getKey())) {
106   - throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
  125 + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Key can't be empty");
107 126 } else if (query.getAggregation() == null) {
108   - throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
  127 + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty");
  128 + }
  129 + }
  130 +
  131 + private static void validate(DeleteTsKvQuery query) {
  132 + if (query == null) {
  133 + throw new IncorrectParameterException("DeleteTsKvQuery can't be null");
  134 + } else if (isBlank(query.getKey())) {
  135 + throw new IncorrectParameterException("Incorrect DeleteTsKvQuery. Key can't be empty");
109 136 }
110 137 }
111 138 }
... ...
... ... @@ -20,6 +20,7 @@ import com.datastax.driver.core.PreparedStatement;
20 20 import com.datastax.driver.core.ResultSet;
21 21 import com.datastax.driver.core.ResultSetFuture;
22 22 import com.datastax.driver.core.Row;
  23 +import com.datastax.driver.core.Statement;
23 24 import com.datastax.driver.core.querybuilder.QueryBuilder;
24 25 import com.datastax.driver.core.querybuilder.Select;
25 26 import com.google.common.base.Function;
... ... @@ -34,16 +35,17 @@ import org.springframework.core.env.Environment;
34 35 import org.springframework.stereotype.Component;
35 36 import org.thingsboard.server.common.data.id.EntityId;
36 37 import org.thingsboard.server.common.data.kv.Aggregation;
37   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  38 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
38 39 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
39 40 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
40 41 import org.thingsboard.server.common.data.kv.DataType;
  42 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
41 43 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
42 44 import org.thingsboard.server.common.data.kv.KvEntry;
43 45 import org.thingsboard.server.common.data.kv.LongDataEntry;
  46 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
44 47 import org.thingsboard.server.common.data.kv.StringDataEntry;
45 48 import org.thingsboard.server.common.data.kv.TsKvEntry;
46   -import org.thingsboard.server.common.data.kv.TsKvQuery;
47 49 import org.thingsboard.server.dao.model.ModelConstants;
48 50 import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
49 51 import org.thingsboard.server.dao.util.NoSqlDao;
... ... @@ -54,11 +56,11 @@ import javax.annotation.PreDestroy;
54 56 import java.time.Instant;
55 57 import java.time.LocalDateTime;
56 58 import java.time.ZoneOffset;
  59 +import java.util.ArrayList;
57 60 import java.util.Arrays;
  61 +import java.util.Collections;
58 62 import java.util.List;
59   -import java.util.ArrayList;
60 63 import java.util.Optional;
61   -import java.util.Collections;
62 64 import java.util.stream.Collectors;
63 65
64 66 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
... ... @@ -76,8 +78,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
76 78 public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
77 79 public static final String SELECT_PREFIX = "SELECT ";
78 80 public static final String EQUALS_PARAM = " = ? ";
79   -
80   -
  81 + public static final String ASC_ORDER = "ASC";
  82 + public static final String DESC_ORDER = "DESC";
81 83 private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
82 84
83 85 @Autowired
... ... @@ -96,9 +98,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
96 98 private PreparedStatement latestInsertStmt;
97 99 private PreparedStatement[] saveStmts;
98 100 private PreparedStatement[] saveTtlStmts;
99   - private PreparedStatement[] fetchStmts;
  101 + private PreparedStatement[] fetchStmtsAsc;
  102 + private PreparedStatement[] fetchStmtsDesc;
100 103 private PreparedStatement findLatestStmt;
101 104 private PreparedStatement findAllLatestStmt;
  105 + private PreparedStatement deleteStmt;
  106 + private PreparedStatement deletePartitionStmt;
102 107
103 108 private boolean isInstall() {
104 109 return environment.acceptsProfiles("install");
... ... @@ -108,7 +113,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
108 113 public void init() {
109 114 super.startExecutor();
110 115 if (!isInstall()) {
111   - getFetchStmt(Aggregation.NONE);
  116 + getFetchStmt(Aggregation.NONE, DESC_ORDER);
112 117 Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
113 118 if (partition.isPresent()) {
114 119 tsFormat = partition.get();
... ... @@ -125,7 +130,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
125 130 }
126 131
127 132 @Override
128   - public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
  133 + public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries) {
129 134 List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityId, query)).collect(Collectors.toList());
130 135 return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
131 136 @Nullable
... ... @@ -142,7 +147,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
142 147 }
143 148
144 149
145   - private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
  150 + private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, ReadTsKvQuery query) {
146 151 if (query.getAggregation() == Aggregation.NONE) {
147 152 return findAllAsyncWithLimit(entityId, query);
148 153 } else {
... ... @@ -152,7 +157,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
152 157 while (stepTs < query.getEndTs()) {
153 158 long startTs = stepTs;
154 159 long endTs = stepTs + step;
155   - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation());
  160 + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
156 161 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
157 162 stepTs = endTs;
158 163 }
... ... @@ -171,7 +176,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
171 176 return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START);
172 177 }
173 178
174   - private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
  179 + private ListenableFuture<List<Long>> getPartitionsFuture(ReadTsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
175 180 if (isFixedPartitioning()) { //no need to fetch partitions from DB
176 181 return Futures.immediateFuture(FIXED_PARTITION);
177 182 }
... ... @@ -179,11 +184,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
179 184 return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
180 185 }
181 186
182   - private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
183   -
  187 + private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
184 188 long minPartition = toPartitionTs(query.getStartTs());
185 189 long maxPartition = toPartitionTs(query.getEndTs());
186   -
187 190 final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
188 191 final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
189 192
... ... @@ -212,7 +215,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
212 215 if (cursor.isFull() || !cursor.hasNextPartition()) {
213 216 resultFuture.set(cursor.getData());
214 217 } else {
215   - PreparedStatement proto = getFetchStmt(Aggregation.NONE);
  218 + PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
216 219 BoundStatement stmt = proto.bind();
217 220 stmt.setString(0, cursor.getEntityType());
218 221 stmt.setUUID(1, cursor.getEntityId());
... ... @@ -237,14 +240,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
237 240 }
238 241 }
239 242
240   - private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, TsKvQuery query, long minPartition, long maxPartition) {
  243 + private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, ReadTsKvQuery query, long minPartition, long maxPartition) {
241 244 final Aggregation aggregation = query.getAggregation();
242 245 final String key = query.getKey();
243 246 final long startTs = query.getStartTs();
244 247 final long endTs = query.getEndTs();
245 248 final long ts = startTs + (endTs - startTs) / 2;
246   -
247   -
248 249 ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
249 250 ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
250 251 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
... ... @@ -260,7 +261,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
260 261 private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
261 262 return partitions -> {
262 263 try {
263   - PreparedStatement proto = getFetchStmt(aggregation);
  264 + PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
264 265 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
265 266 for (Long partition : partitions) {
266 267 log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
... ... @@ -363,6 +364,204 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
363 364 return getFuture(executeAsyncWrite(stmt), rs -> null);
364 365 }
365 366
  367 + @Override
  368 + public ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query) {
  369 + long minPartition = toPartitionTs(query.getStartTs());
  370 + long maxPartition = toPartitionTs(query.getEndTs());
  371 +
  372 + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
  373 +
  374 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  375 + final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  376 +
  377 + Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
  378 + @Override
  379 + public void onSuccess(@Nullable List<Long> partitions) {
  380 + QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
  381 + deleteAsync(cursor, resultFuture);
  382 + }
  383 +
  384 + @Override
  385 + public void onFailure(Throwable t) {
  386 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
  387 + }
  388 + }, readResultsProcessingExecutor);
  389 + return resultFuture;
  390 + }
  391 +
  392 + private void deleteAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  393 + if (!cursor.hasNextPartition()) {
  394 + resultFuture.set(null);
  395 + } else {
  396 + PreparedStatement proto = getDeleteStmt();
  397 + BoundStatement stmt = proto.bind();
  398 + stmt.setString(0, cursor.getEntityType());
  399 + stmt.setUUID(1, cursor.getEntityId());
  400 + stmt.setString(2, cursor.getKey());
  401 + stmt.setLong(3, cursor.getNextPartition());
  402 + stmt.setLong(4, cursor.getStartTs());
  403 + stmt.setLong(5, cursor.getEndTs());
  404 +
  405 + Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
  406 + @Override
  407 + public void onSuccess(@Nullable ResultSet result) {
  408 + deleteAsync(cursor, resultFuture);
  409 + }
  410 +
  411 + @Override
  412 + public void onFailure(Throwable t) {
  413 + log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
  414 + }
  415 + }, readResultsProcessingExecutor);
  416 + }
  417 + }
  418 +
  419 + private PreparedStatement getDeleteStmt() {
  420 + if (deleteStmt == null) {
  421 + deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
  422 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  423 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  424 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
  425 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  426 + + "AND " + ModelConstants.TS_COLUMN + " > ? "
  427 + + "AND " + ModelConstants.TS_COLUMN + " <= ?");
  428 + }
  429 + return deleteStmt;
  430 + }
  431 +
  432 + @Override
  433 + public ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query) {
  434 + ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
  435 +
  436 + ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
  437 + long ts = latestEntry.getTs();
  438 + if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
  439 + return Futures.immediateFuture(true);
  440 + } else {
  441 + log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
  442 + }
  443 + return Futures.immediateFuture(false);
  444 + }, readResultsProcessingExecutor);
  445 +
  446 + ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  447 + if (isRemove) {
  448 + return deleteLatest(entityId, query.getKey());
  449 + }
  450 + return Futures.immediateFuture(null);
  451 + }, readResultsProcessingExecutor);
  452 +
  453 + if (query.getRewriteLatestIfDeleted()) {
  454 + ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  455 + if (isRemove) {
  456 + return getNewLatestEntryFuture(entityId, query);
  457 + }
  458 + return Futures.immediateFuture(null);
  459 + }, readResultsProcessingExecutor);
  460 +
  461 + return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
  462 + list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
  463 + }
  464 + return removedLatestFuture;
  465 + }
  466 +
  467 + private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) {
  468 + long startTs = 0;
  469 + long endTs = query.getStartTs() - 1;
  470 + ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
  471 + Aggregation.NONE, DESC_ORDER);
  472 + ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
  473 +
  474 + return Futures.transformAsync(future, entryList -> {
  475 + if (entryList.size() == 1) {
  476 + return saveLatest(entityId, entryList.get(0));
  477 + } else {
  478 + log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
  479 + }
  480 + return Futures.immediateFuture(null);
  481 + }, readResultsProcessingExecutor);
  482 + }
  483 +
  484 + private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
  485 + Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
  486 + .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
  487 + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
  488 + .and(eq(ModelConstants.KEY_COLUMN, key));
  489 + log.debug("Remove request: {}", delete.toString());
  490 + return getFuture(executeAsyncWrite(delete), rs -> null);
  491 + }
  492 +
  493 + @Override
  494 + public ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query) {
  495 + long minPartition = toPartitionTs(query.getStartTs());
  496 + long maxPartition = toPartitionTs(query.getEndTs());
  497 + if (minPartition == maxPartition) {
  498 + return Futures.immediateFuture(null);
  499 + } else {
  500 + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
  501 +
  502 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  503 + final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  504 +
  505 + Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
  506 + @Override
  507 + public void onSuccess(@Nullable List<Long> partitions) {
  508 + int index = 0;
  509 + if (minPartition != query.getStartTs()) {
  510 + index = 1;
  511 + }
  512 + List<Long> partitionsToDelete = new ArrayList<>();
  513 + for (int i = index; i < partitions.size() - 1; i++) {
  514 + partitionsToDelete.add(partitions.get(i));
  515 + }
  516 + QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
  517 + deletePartitionAsync(cursor, resultFuture);
  518 + }
  519 +
  520 + @Override
  521 + public void onFailure(Throwable t) {
  522 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
  523 + }
  524 + }, readResultsProcessingExecutor);
  525 + return resultFuture;
  526 + }
  527 + }
  528 +
  529 + private void deletePartitionAsync(final QueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  530 + if (!cursor.hasNextPartition()) {
  531 + resultFuture.set(null);
  532 + } else {
  533 + PreparedStatement proto = getDeletePartitionStmt();
  534 + BoundStatement stmt = proto.bind();
  535 + stmt.setString(0, cursor.getEntityType());
  536 + stmt.setUUID(1, cursor.getEntityId());
  537 + stmt.setLong(2, cursor.getNextPartition());
  538 + stmt.setString(3, cursor.getKey());
  539 +
  540 + Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
  541 + @Override
  542 + public void onSuccess(@Nullable ResultSet result) {
  543 + deletePartitionAsync(cursor, resultFuture);
  544 + }
  545 +
  546 + @Override
  547 + public void onFailure(Throwable t) {
  548 + log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
  549 + }
  550 + }, readResultsProcessingExecutor);
  551 + }
  552 + }
  553 +
  554 + private PreparedStatement getDeletePartitionStmt() {
  555 + if (deletePartitionStmt == null) {
  556 + deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
  557 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  558 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  559 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  560 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
  561 + }
  562 + return deletePartitionStmt;
  563 + }
  564 +
366 565 private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
367 566 List<TsKvEntry> entries = new ArrayList<>(rows.size());
368 567 if (!rows.isEmpty()) {
... ... @@ -458,28 +657,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
458 657 return saveTtlStmts[dataType.ordinal()];
459 658 }
460 659
461   - private PreparedStatement getFetchStmt(Aggregation aggType) {
462   - if (fetchStmts == null) {
463   - fetchStmts = new PreparedStatement[Aggregation.values().length];
464   - for (Aggregation type : Aggregation.values()) {
465   - if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
466   - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
467   - } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
468   - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
469   - } else {
470   - fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
471   - String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
472   - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
473   - + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
474   - + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
475   - + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
476   - + "AND " + ModelConstants.TS_COLUMN + " > ? "
477   - + "AND " + ModelConstants.TS_COLUMN + " <= ?"
478   - + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
  660 + private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
  661 + switch (orderBy) {
  662 + case ASC_ORDER:
  663 + if (fetchStmtsAsc == null) {
  664 + fetchStmtsAsc = initFetchStmt(orderBy);
  665 + }
  666 + return fetchStmtsAsc[aggType.ordinal()];
  667 + case DESC_ORDER:
  668 + if (fetchStmtsDesc == null) {
  669 + fetchStmtsDesc = initFetchStmt(orderBy);
479 670 }
  671 + return fetchStmtsDesc[aggType.ordinal()];
  672 + default:
  673 + throw new RuntimeException("Not supported" + orderBy + "order!");
  674 + }
  675 + }
  676 +
  677 + private PreparedStatement[] initFetchStmt(String orderBy) {
  678 + PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
  679 + for (Aggregation type : Aggregation.values()) {
  680 + if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
  681 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
  682 + } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
  683 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
  684 + } else {
  685 + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
  686 + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
  687 + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  688 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  689 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
  690 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  691 + + "AND " + ModelConstants.TS_COLUMN + " > ? "
  692 + + "AND " + ModelConstants.TS_COLUMN + " <= ?"
  693 + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
480 694 }
481 695 }
482   - return fetchStmts[aggType.ordinal()];
  696 + return fetchStmts;
483 697 }
484 698
485 699 private PreparedStatement getLatestStmt() {
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.Getter;
  19 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  20 +
  21 +import java.util.List;
  22 +import java.util.UUID;
  23 +
  24 +public class QueryCursor {
  25 +
  26 + @Getter
  27 + protected final String entityType;
  28 + @Getter
  29 + protected final UUID entityId;
  30 + @Getter
  31 + protected final String key;
  32 + @Getter
  33 + private final long startTs;
  34 + @Getter
  35 + private final long endTs;
  36 +
  37 + final List<Long> partitions;
  38 + private int partitionIndex;
  39 +
  40 + public QueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
  41 + this.entityType = entityType;
  42 + this.entityId = entityId;
  43 + this.key = baseQuery.getKey();
  44 + this.startTs = baseQuery.getStartTs();
  45 + this.endTs = baseQuery.getEndTs();
  46 + this.partitions = partitions;
  47 + this.partitionIndex = partitions.size() - 1;
  48 + }
  49 +
  50 + public boolean hasNextPartition() {
  51 + return partitionIndex >= 0;
  52 + }
  53 +
  54 + public long getNextPartition() {
  55 + long partition = partitions.get(partitionIndex);
  56 + partitionIndex--;
  57 + return partition;
  58 + }
  59 +
  60 +}
... ...
... ... @@ -17,8 +17,9 @@ package org.thingsboard.server.dao.timeseries;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  21 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
20 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
21   -import org.thingsboard.server.common.data.kv.TsKvQuery;
22 23
23 24 import java.util.List;
24 25
... ... @@ -27,7 +28,7 @@ import java.util.List;
27 28 */
28 29 public interface TimeseriesDao {
29 30
30   - ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
  31 + ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<ReadTsKvQuery> queries);
31 32
32 33 ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
33 34
... ... @@ -38,4 +39,10 @@ public interface TimeseriesDao {
38 39 ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
39 40
40 41 ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
  42 +
  43 + ListenableFuture<Void> remove(EntityId entityId, DeleteTsKvQuery query);
  44 +
  45 + ListenableFuture<Void> removeLatest(EntityId entityId, DeleteTsKvQuery query);
  46 +
  47 + ListenableFuture<Void> removePartition(EntityId entityId, DeleteTsKvQuery query);
41 48 }
... ...
... ... @@ -17,8 +17,9 @@ package org.thingsboard.server.dao.timeseries;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19 19 import org.thingsboard.server.common.data.id.EntityId;
  20 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  21 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
20 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
21   -import org.thingsboard.server.common.data.kv.TsKvQuery;
22 23
23 24 import java.util.Collection;
24 25 import java.util.List;
... ... @@ -28,7 +29,7 @@ import java.util.List;
28 29 */
29 30 public interface TimeseriesService {
30 31
31   - ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
  32 + ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<ReadTsKvQuery> queries);
32 33
33 34 ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
34 35
... ... @@ -37,4 +38,6 @@ public interface TimeseriesService {
37 38 ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
38 39
39 40 ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
  41 +
  42 + ListenableFuture<List<Void>> remove(EntityId entityId, List<DeleteTsKvQuery> queries);
40 43 }
... ...
... ... @@ -16,57 +16,53 @@
16 16 package org.thingsboard.server.dao.timeseries;
17 17
18 18 import lombok.Getter;
  19 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
19 20 import org.thingsboard.server.common.data.kv.TsKvEntry;
20   -import org.thingsboard.server.common.data.kv.TsKvQuery;
21 21
22 22 import java.util.ArrayList;
23 23 import java.util.List;
24 24 import java.util.UUID;
25 25
  26 +import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
  27 +
26 28 /**
27 29 * Created by ashvayka on 21.02.17.
28 30 */
29   -public class TsKvQueryCursor {
30   - @Getter
31   - private final String entityType;
32   - @Getter
33   - private final UUID entityId;
34   - @Getter
35   - private final String key;
36   - @Getter
37   - private final long startTs;
38   - @Getter
39   - private final long endTs;
40   - private final List<Long> partitions;
  31 +public class TsKvQueryCursor extends QueryCursor {
  32 +
41 33 @Getter
42 34 private final List<TsKvEntry> data;
  35 + @Getter
  36 + private String orderBy;
43 37
44 38 private int partitionIndex;
45 39 private int currentLimit;
46 40
47   - public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
48   - this.entityType = entityType;
49   - this.entityId = entityId;
50   - this.key = baseQuery.getKey();
51   - this.startTs = baseQuery.getStartTs();
52   - this.endTs = baseQuery.getEndTs();
53   - this.partitions = partitions;
54   - this.partitionIndex = partitions.size() - 1;
  41 + public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List<Long> partitions) {
  42 + super(entityType, entityId, baseQuery, partitions);
  43 + this.orderBy = baseQuery.getOrderBy();
  44 + this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
55 45 this.data = new ArrayList<>();
56 46 this.currentLimit = baseQuery.getLimit();
57 47 }
58 48
  49 + @Override
59 50 public boolean hasNextPartition() {
60   - return partitionIndex >= 0;
  51 + return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
61 52 }
62 53
63 54 public boolean isFull() {
64 55 return currentLimit <= 0;
65 56 }
66 57
  58 + @Override
67 59 public long getNextPartition() {
68 60 long partition = partitions.get(partitionIndex);
69   - partitionIndex--;
  61 + if (isDesc()) {
  62 + partitionIndex--;
  63 + } else {
  64 + partitionIndex++;
  65 + }
70 66 return partition;
71 67 }
72 68
... ... @@ -78,4 +74,8 @@ public class TsKvQueryCursor {
78 74 currentLimit -= newData.size();
79 75 data.addAll(newData);
80 76 }
  77 +
  78 + private boolean isDesc() {
  79 + return orderBy.equals(DESC_ORDER);
  80 + }
81 81 }
... ...
... ... @@ -21,7 +21,8 @@ import org.junit.Assert;
21 21 import org.junit.Test;
22 22 import org.thingsboard.server.common.data.id.DeviceId;
23 23 import org.thingsboard.server.common.data.kv.Aggregation;
24   -import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
  24 +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
  25 +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
25 26 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
26 27 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
27 28 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
... ... @@ -53,6 +54,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
53 54 private static final String BOOLEAN_KEY = "booleanKey";
54 55
55 56 private static final long TS = 42L;
  57 + private static final String DESC_ORDER = "DESC";
56 58
57 59 KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
58 60 KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
... ... @@ -101,6 +103,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
101 103 }
102 104
103 105 @Test
  106 + public void testDeleteDeviceTsData() throws Exception {
  107 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  108 +
  109 + saveEntries(deviceId, 10000);
  110 + saveEntries(deviceId, 20000);
  111 + saveEntries(deviceId, 30000);
  112 + saveEntries(deviceId, 40000);
  113 +
  114 + tsService.remove(deviceId, Collections.singletonList(
  115 + new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get();
  116 +
  117 + List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
  118 + new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get();
  119 + Assert.assertEquals(1, list.size());
  120 +
  121 + List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
  122 + Assert.assertEquals(null, latest.get(0).getValueAsString());
  123 + }
  124 +
  125 + @Test
104 126 public void testFindDeviceTsData() throws Exception {
105 127 DeviceId deviceId = new DeviceId(UUIDs.timeBased());
106 128 List<TsKvEntry> entries = new ArrayList<>();
... ... @@ -114,7 +136,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
114 136 entries.add(save(deviceId, 45000, 500));
115 137 entries.add(save(deviceId, 55000, 600));
116 138
117   - List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  139 + List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
118 140 60000, 20000, 3, Aggregation.NONE))).get();
119 141 assertEquals(3, list.size());
120 142 assertEquals(55000, list.get(0).getTs());
... ... @@ -126,7 +148,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
126 148 assertEquals(35000, list.get(2).getTs());
127 149 assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
128 150
129   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  151 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
130 152 60000, 20000, 3, Aggregation.AVG))).get();
131 153 assertEquals(3, list.size());
132 154 assertEquals(10000, list.get(0).getTs());
... ... @@ -138,7 +160,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
138 160 assertEquals(50000, list.get(2).getTs());
139 161 assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
140 162
141   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  163 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
142 164 60000, 20000, 3, Aggregation.SUM))).get();
143 165
144 166 assertEquals(3, list.size());
... ... @@ -151,7 +173,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
151 173 assertEquals(50000, list.get(2).getTs());
152 174 assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
153 175
154   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  176 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
155 177 60000, 20000, 3, Aggregation.MIN))).get();
156 178
157 179 assertEquals(3, list.size());
... ... @@ -164,7 +186,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
164 186 assertEquals(50000, list.get(2).getTs());
165 187 assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
166 188
167   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  189 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
168 190 60000, 20000, 3, Aggregation.MAX))).get();
169 191
170 192 assertEquals(3, list.size());
... ... @@ -177,7 +199,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
177 199 assertEquals(50000, list.get(2).getTs());
178 200 assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
179 201
180   - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
  202 + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
181 203 60000, 20000, 3, Aggregation.COUNT))).get();
182 204
183 205 assertEquals(3, list.size());
... ...