Commit 914a052ae4a5c6d555bf22af6630e98a506d1090

Authored by Dima Landiak
2 parents 4074870c 712e1ae9

merged with deleting timeseries

... ... @@ -76,7 +76,7 @@ public class DeviceController extends BaseController {
76 76 device.setTenantId(getCurrentUser().getTenantId());
77 77 if (getCurrentUser().getAuthority() == Authority.CUSTOMER_USER) {
78 78 if (device.getId() == null || device.getId().isNullUid() ||
79   - device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
  79 + device.getCustomerId() == null || device.getCustomerId().isNullUid()) {
80 80 throw new ThingsboardException("You don't have permission to perform this operation!",
81 81 ThingsboardErrorCode.PERMISSION_DENIED);
82 82 } else {
... ...
... ... @@ -201,7 +201,7 @@ public class TelemetryController extends BaseController {
201 201 (result, entityId) -> {
202 202 // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
203 203 Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr);
204   - List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg))
  204 + List<TsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC", false))
205 205 .collect(Collectors.toList());
206 206
207 207 Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result));
... ...
... ... @@ -251,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
251 251 }
252 252 EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
253 253 List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
254   - List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg())))
  254 + List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false))
255 255 .collect(Collectors.toList());
256 256
257 257 FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
... ... @@ -338,7 +338,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
338 338 startTs = cmd.getStartTs();
339 339 long endTs = cmd.getStartTs() + cmd.getTimeWindow();
340 340 List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
341   - getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList());
  341 + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC", false)).collect(Collectors.toList());
342 342
343 343 final FutureCallback<List<TsKvEntry>> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys);
344 344 accessValidator.validate(sessionRef.getSecurityCtx(), entityId,
... ...
... ... @@ -26,18 +26,23 @@ public class BaseTsKvQuery implements TsKvQuery {
26 26 private final long interval;
27 27 private final int limit;
28 28 private final Aggregation aggregation;
  29 + private final String orderBy;
  30 + private final Boolean rewriteLatestIfDeleted;
29 31
30   - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) {
  32 + public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
  33 + boolean rewriteLatestIfDeleted) {
31 34 this.key = key;
32 35 this.startTs = startTs;
33 36 this.endTs = endTs;
34 37 this.interval = interval;
35 38 this.limit = limit;
36 39 this.aggregation = aggregation;
  40 + this.orderBy = orderBy;
  41 + this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
37 42 }
38 43
39 44 public BaseTsKvQuery(String key, long startTs, long endTs) {
40   - this(key, startTs, endTs, endTs-startTs, 1, Aggregation.AVG);
  45 + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
41 46 }
42 47
43 48 }
... ...
... ... @@ -29,4 +29,7 @@ public interface TsKvQuery {
29 29
30 30 Aggregation getAggregation();
31 31
  32 + String getOrderBy();
  33 +
  34 + Boolean getRewriteLatestIfDeleted();
32 35 }
... ...
... ... @@ -306,6 +306,36 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
306 306 });
307 307 }
308 308
  309 + @Override
  310 + public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
  311 + return service.submit(() -> {
  312 + tsKvRepository.delete(
  313 + fromTimeUUID(entityId.getId()),
  314 + entityId.getEntityType(),
  315 + query.getKey(),
  316 + query.getStartTs(),
  317 + query.getEndTs());
  318 + return null;
  319 + });
  320 + }
  321 +
  322 + @Override
  323 + public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
  324 + TsKvLatestEntity latestEntity = new TsKvLatestEntity();
  325 + latestEntity.setEntityType(entityId.getEntityType());
  326 + latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
  327 + latestEntity.setKey(query.getKey());
  328 + return service.submit(() -> {
  329 + tsKvLatestRepository.delete(latestEntity);
  330 + return null;
  331 + });
  332 + }
  333 +
  334 + @Override
  335 + public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
  336 + return service.submit(() -> null);
  337 + }
  338 +
309 339 @PreDestroy
310 340 void onDestroy() {
311 341 if (insertService != null) {
... ...
... ... @@ -16,10 +16,12 @@
16 16 package org.thingsboard.server.dao.sql.timeseries;
17 17
18 18 import org.springframework.data.domain.Pageable;
  19 +import org.springframework.data.jpa.repository.Modifying;
19 20 import org.springframework.data.jpa.repository.Query;
20 21 import org.springframework.data.repository.CrudRepository;
21 22 import org.springframework.data.repository.query.Param;
22 23 import org.springframework.scheduling.annotation.Async;
  24 +import org.springframework.transaction.annotation.Transactional;
23 25 import org.thingsboard.server.common.data.EntityType;
24 26 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
25 27 import org.thingsboard.server.dao.model.sql.TsKvEntity;
... ... @@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
41 43 @Param("endTs") long endTs,
42 44 Pageable pageable);
43 45
  46 + @Transactional
  47 + @Modifying
  48 + @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
  49 + "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
  50 + "AND tskv.ts > :startTs AND tskv.ts < :endTs")
  51 + void delete(@Param("entityId") String entityId,
  52 + @Param("entityType") EntityType entityType,
  53 + @Param("entityKey") String key,
  54 + @Param("startTs") long startTs,
  55 + @Param("endTs") long endTs);
  56 +
44 57 @Async
45 58 @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
46 59 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
... ... @@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
56 69 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
57 70 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
58 71 CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
59   - @Param("entityType") EntityType entityType,
60   - @Param("entityKey") String entityKey,
61   - @Param("startTs") long startTs,
62   - @Param("endTs") long endTs);
  72 + @Param("entityType") EntityType entityType,
  73 + @Param("entityKey") String entityKey,
  74 + @Param("startTs") long startTs,
  75 + @Param("endTs") long endTs);
63 76
64 77 @Async
65 78 @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
66 79 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
67 80 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
68 81 CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
69   - @Param("entityType") EntityType entityType,
70   - @Param("entityKey") String entityKey,
71   - @Param("startTs") long startTs,
72   - @Param("endTs") long endTs);
  82 + @Param("entityType") EntityType entityType,
  83 + @Param("entityKey") String entityKey,
  84 + @Param("startTs") long startTs,
  85 + @Param("endTs") long endTs);
73 86
74 87 @Async
75 88 @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
76 89 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
77 90 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
78 91 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
79   - @Param("entityType") EntityType entityType,
80   - @Param("entityKey") String entityKey,
81   - @Param("startTs") long startTs,
82   - @Param("endTs") long endTs);
  92 + @Param("entityType") EntityType entityType,
  93 + @Param("entityKey") String entityKey,
  94 + @Param("startTs") long startTs,
  95 + @Param("endTs") long endTs);
83 96
84 97
85 98 @Async
... ... @@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
87 100 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
88 101 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
89 102 CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
90   - @Param("entityType") EntityType entityType,
91   - @Param("entityKey") String entityKey,
92   - @Param("startTs") long startTs,
93   - @Param("endTs") long endTs);
  103 + @Param("entityType") EntityType entityType,
  104 + @Param("entityKey") String entityKey,
  105 + @Param("startTs") long startTs,
  106 + @Param("endTs") long endTs);
94 107 }
... ...
... ... @@ -40,6 +40,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
40 40 public class BaseTimeseriesService implements TimeseriesService {
41 41
42 42 public static final int INSERTS_PER_ENTRY = 3;
  43 + public static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY;
43 44
44 45 @Autowired
45 46 private TimeseriesDao timeseriesDao;
... ... @@ -95,6 +96,23 @@ public class BaseTimeseriesService implements TimeseriesService {
95 96 futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
96 97 }
97 98
  99 + @Override
  100 + public ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> tsKvQueries) {
  101 + validate(entityId);
  102 + tsKvQueries.forEach(BaseTimeseriesService::validate);
  103 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvQueries.size() * DELETES_PER_ENTRY);
  104 + for (TsKvQuery tsKvQuery : tsKvQueries) {
  105 + deleteAndRegisterFutures(futures, entityId, tsKvQuery);
  106 + }
  107 + return Futures.allAsList(futures);
  108 + }
  109 +
  110 + private void deleteAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvQuery query) {
  111 + futures.add(timeseriesDao.remove(entityId, query));
  112 + futures.add(timeseriesDao.removeLatest(entityId, query));
  113 + futures.add(timeseriesDao.removePartition(entityId, query));
  114 + }
  115 +
98 116 private static void validate(EntityId entityId) {
99 117 Validator.validateEntityId(entityId, "Incorrect entityId " + entityId);
100 118 }
... ...
... ... @@ -15,11 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.dao.timeseries;
17 17
18   -import com.datastax.driver.core.BoundStatement;
19   -import com.datastax.driver.core.PreparedStatement;
20   -import com.datastax.driver.core.ResultSet;
21   -import com.datastax.driver.core.ResultSetFuture;
22   -import com.datastax.driver.core.Row;
  18 +import com.datastax.driver.core.*;
23 19 import com.datastax.driver.core.querybuilder.QueryBuilder;
24 20 import com.datastax.driver.core.querybuilder.Select;
25 21 import com.google.common.base.Function;
... ... @@ -54,10 +50,7 @@ import javax.annotation.PreDestroy;
54 50 import java.time.Instant;
55 51 import java.time.LocalDateTime;
56 52 import java.time.ZoneOffset;
57   -import java.util.ArrayList;
58   -import java.util.Collections;
59   -import java.util.List;
60   -import java.util.Optional;
  53 +import java.util.*;
61 54 import java.util.stream.Collectors;
62 55
63 56 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
... ... @@ -75,6 +68,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
75 68 public static final String GENERATED_QUERY_FOR_ENTITY_TYPE_AND_ENTITY_ID = "Generated query [{}] for entityType {} and entityId {}";
76 69 public static final String SELECT_PREFIX = "SELECT ";
77 70 public static final String EQUALS_PARAM = " = ? ";
  71 + public static final String ASC_ORDER = "ASC";
  72 + public static final String DESC_ORDER = "DESC";
78 73
79 74 @Autowired
80 75 private Environment environment;
... ... @@ -92,9 +87,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
92 87 private PreparedStatement latestInsertStmt;
93 88 private PreparedStatement[] saveStmts;
94 89 private PreparedStatement[] saveTtlStmts;
95   - private PreparedStatement[] fetchStmts;
  90 + private PreparedStatement[] fetchStmtsAsc;
  91 + private PreparedStatement[] fetchStmtsDesc;
96 92 private PreparedStatement findLatestStmt;
97 93 private PreparedStatement findAllLatestStmt;
  94 + private PreparedStatement deleteStmt;
  95 + private PreparedStatement deletePartitionStmt;
98 96
99 97 private boolean isInstall() {
100 98 return environment.acceptsProfiles("install");
... ... @@ -104,7 +102,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
104 102 public void init() {
105 103 super.startExecutor();
106 104 if (!isInstall()) {
107   - getFetchStmt(Aggregation.NONE);
  105 + getFetchStmt(Aggregation.NONE, DESC_ORDER);
108 106 Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
109 107 if (partition.isPresent()) {
110 108 tsFormat = partition.get();
... ... @@ -148,7 +146,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
148 146 while (stepTs < query.getEndTs()) {
149 147 long startTs = stepTs;
150 148 long endTs = stepTs + step;
151   - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation());
  149 + TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
152 150 futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
153 151 stepTs = endTs;
154 152 }
... ... @@ -197,7 +195,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
197 195 if (cursor.isFull() || !cursor.hasNextPartition()) {
198 196 resultFuture.set(cursor.getData());
199 197 } else {
200   - PreparedStatement proto = getFetchStmt(Aggregation.NONE);
  198 + PreparedStatement proto = getFetchStmt(Aggregation.NONE, cursor.getOrderBy());
201 199 BoundStatement stmt = proto.bind();
202 200 stmt.setString(0, cursor.getEntityType());
203 201 stmt.setUUID(1, cursor.getEntityId());
... ... @@ -247,7 +245,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
247 245 private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
248 246 return partitions -> {
249 247 try {
250   - PreparedStatement proto = getFetchStmt(aggregation);
  248 + PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
251 249 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
252 250 for (Long partition : partitions) {
253 251 log.trace("Fetching data for partition [{}] for entityType {} and entityId {}", partition, entityId.getEntityType(), entityId.getId());
... ... @@ -347,6 +345,204 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
347 345 return getFuture(executeAsyncWrite(stmt), rs -> null);
348 346 }
349 347
  348 + @Override
  349 + public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
  350 + long minPartition = toPartitionTs(query.getStartTs());
  351 + long maxPartition = toPartitionTs(query.getEndTs());
  352 +
  353 + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
  354 +
  355 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  356 + final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  357 +
  358 + Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
  359 + @Override
  360 + public void onSuccess(@Nullable List<Long> partitions) {
  361 + TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions);
  362 + deleteAsync(cursor, resultFuture);
  363 + }
  364 +
  365 + @Override
  366 + public void onFailure(Throwable t) {
  367 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
  368 + }
  369 + }, readResultsProcessingExecutor);
  370 + return resultFuture;
  371 + }
  372 +
  373 + private void deleteAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  374 + if (!cursor.hasNextPartition()) {
  375 + resultFuture.set(null);
  376 + } else {
  377 + PreparedStatement proto = getDeleteStmt();
  378 + BoundStatement stmt = proto.bind();
  379 + stmt.setString(0, cursor.getEntityType());
  380 + stmt.setUUID(1, cursor.getEntityId());
  381 + stmt.setString(2, cursor.getKey());
  382 + stmt.setLong(3, cursor.getNextPartition());
  383 + stmt.setLong(4, cursor.getStartTs());
  384 + stmt.setLong(5, cursor.getEndTs());
  385 +
  386 + Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
  387 + @Override
  388 + public void onSuccess(@Nullable ResultSet result) {
  389 + deleteAsync(cursor, resultFuture);
  390 + }
  391 +
  392 + @Override
  393 + public void onFailure(Throwable t) {
  394 + log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
  395 + }
  396 + }, readResultsProcessingExecutor);
  397 + }
  398 + }
  399 +
  400 + private PreparedStatement getDeleteStmt() {
  401 + if (deleteStmt == null) {
  402 + deleteStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_CF +
  403 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  404 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  405 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
  406 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  407 + + "AND " + ModelConstants.TS_COLUMN + " > ? "
  408 + + "AND " + ModelConstants.TS_COLUMN + " <= ?");
  409 + }
  410 + return deleteStmt;
  411 + }
  412 +
  413 + @Override
  414 + public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
  415 + ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(entityId, query.getKey());
  416 +
  417 + ListenableFuture<Boolean> booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> {
  418 + long ts = latestEntry.getTs();
  419 + if (ts >= query.getStartTs() && ts <= query.getEndTs()) {
  420 + return Futures.immediateFuture(true);
  421 + } else {
  422 + log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey());
  423 + }
  424 + return Futures.immediateFuture(false);
  425 + }, readResultsProcessingExecutor);
  426 +
  427 + ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  428 + if (isRemove) {
  429 + return deleteLatest(entityId, query.getKey());
  430 + }
  431 + return Futures.immediateFuture(null);
  432 + }, readResultsProcessingExecutor);
  433 +
  434 + if (query.getRewriteLatestIfDeleted()) {
  435 + ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
  436 + if (isRemove) {
  437 + return getNewLatestEntryFuture(entityId, query);
  438 + }
  439 + return Futures.immediateFuture(null);
  440 + }, readResultsProcessingExecutor);
  441 +
  442 + return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
  443 + list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
  444 + }
  445 + return removedLatestFuture;
  446 + }
  447 +
  448 + private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
  449 + long startTs = 0;
  450 + long endTs = query.getStartTs() - 1;
  451 + TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
  452 + Aggregation.NONE, DESC_ORDER, false);
  453 + ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
  454 +
  455 + return Futures.transformAsync(future, entryList -> {
  456 + if (entryList.size() == 1) {
  457 + return saveLatest(entityId, entryList.get(0));
  458 + } else {
  459 + log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
  460 + }
  461 + return Futures.immediateFuture(null);
  462 + }, readResultsProcessingExecutor);
  463 + }
  464 +
  465 + private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
  466 + Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
  467 + .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
  468 + .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
  469 + .and(eq(ModelConstants.KEY_COLUMN, key));
  470 + log.debug("Remove request: {}", delete.toString());
  471 + return getFuture(executeAsyncWrite(delete), rs -> null);
  472 + }
  473 +
  474 + @Override
  475 + public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
  476 + long minPartition = toPartitionTs(query.getStartTs());
  477 + long maxPartition = toPartitionTs(query.getEndTs());
  478 + if (minPartition == maxPartition) {
  479 + return Futures.immediateFuture(null);
  480 + } else {
  481 + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
  482 +
  483 + final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
  484 + final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  485 +
  486 + Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
  487 + @Override
  488 + public void onSuccess(@Nullable List<Long> partitions) {
  489 + int index = 0;
  490 + if (minPartition != query.getStartTs()) {
  491 + index = 1;
  492 + }
  493 + List<Long> partitionsToDelete = new ArrayList<>();
  494 + for (int i = index; i < partitions.size() - 1; i++) {
  495 + partitionsToDelete.add(partitions.get(i));
  496 + }
  497 + TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
  498 + deletePartitionAsync(cursor, resultFuture);
  499 + }
  500 +
  501 + @Override
  502 + public void onFailure(Throwable t) {
  503 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
  504 + }
  505 + }, readResultsProcessingExecutor);
  506 + return resultFuture;
  507 + }
  508 + }
  509 +
  510 + private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {
  511 + if (!cursor.hasNextPartition()) {
  512 + resultFuture.set(null);
  513 + } else {
  514 + PreparedStatement proto = getDeletePartitionStmt();
  515 + BoundStatement stmt = proto.bind();
  516 + stmt.setString(0, cursor.getEntityType());
  517 + stmt.setUUID(1, cursor.getEntityId());
  518 + stmt.setLong(2, cursor.getNextPartition());
  519 + stmt.setString(3, cursor.getKey());
  520 +
  521 + Futures.addCallback(executeAsyncWrite(stmt), new FutureCallback<ResultSet>() {
  522 + @Override
  523 + public void onSuccess(@Nullable ResultSet result) {
  524 + deletePartitionAsync(cursor, resultFuture);
  525 + }
  526 +
  527 + @Override
  528 + public void onFailure(Throwable t) {
  529 + log.error("[{}][{}] Failed to delete data for query {}-{}", stmt, t);
  530 + }
  531 + }, readResultsProcessingExecutor);
  532 + }
  533 + }
  534 +
  535 + private PreparedStatement getDeletePartitionStmt() {
  536 + if (deletePartitionStmt == null) {
  537 + deletePartitionStmt = prepare("DELETE FROM " + ModelConstants.TS_KV_PARTITIONS_CF +
  538 + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  539 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  540 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  541 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM);
  542 + }
  543 + return deletePartitionStmt;
  544 + }
  545 +
350 546 private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
351 547 List<TsKvEntry> entries = new ArrayList<>(rows.size());
352 548 if (!rows.isEmpty()) {
... ... @@ -442,28 +638,43 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
442 638 return saveTtlStmts[dataType.ordinal()];
443 639 }
444 640
445   - private PreparedStatement getFetchStmt(Aggregation aggType) {
446   - if (fetchStmts == null) {
447   - fetchStmts = new PreparedStatement[Aggregation.values().length];
448   - for (Aggregation type : Aggregation.values()) {
449   - if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
450   - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
451   - } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
452   - fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
453   - } else {
454   - fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
455   - String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
456   - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
457   - + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
458   - + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
459   - + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
460   - + "AND " + ModelConstants.TS_COLUMN + " > ? "
461   - + "AND " + ModelConstants.TS_COLUMN + " <= ?"
462   - + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
  641 + private PreparedStatement getFetchStmt(Aggregation aggType, String orderBy) {
  642 + switch (orderBy) {
  643 + case ASC_ORDER:
  644 + if (fetchStmtsAsc == null) {
  645 + fetchStmtsAsc = initFetchStmt(orderBy);
  646 + }
  647 + return fetchStmtsAsc[aggType.ordinal()];
  648 + case DESC_ORDER:
  649 + if (fetchStmtsDesc == null) {
  650 + fetchStmtsDesc = initFetchStmt(orderBy);
463 651 }
  652 + return fetchStmtsDesc[aggType.ordinal()];
  653 + default:
  654 + throw new RuntimeException("Not supported" + orderBy + "order!");
  655 + }
  656 + }
  657 +
  658 + private PreparedStatement[] initFetchStmt(String orderBy) {
  659 + PreparedStatement[] fetchStmts = new PreparedStatement[Aggregation.values().length];
  660 + for (Aggregation type : Aggregation.values()) {
  661 + if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
  662 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
  663 + } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
  664 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
  665 + } else {
  666 + fetchStmts[type.ordinal()] = prepare(SELECT_PREFIX +
  667 + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
  668 + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + EQUALS_PARAM
  669 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
  670 + + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
  671 + + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
  672 + + "AND " + ModelConstants.TS_COLUMN + " > ? "
  673 + + "AND " + ModelConstants.TS_COLUMN + " <= ?"
  674 + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
464 675 }
465 676 }
466   - return fetchStmts[aggType.ordinal()];
  677 + return fetchStmts;
467 678 }
468 679
469 680 private PreparedStatement getLatestStmt() {
... ...
... ... @@ -38,4 +38,10 @@ public interface TimeseriesDao {
38 38 ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
39 39
40 40 ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
  41 +
  42 + ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query);
  43 +
  44 + ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query);
  45 +
  46 + ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query);
41 47 }
... ...
... ... @@ -37,4 +37,6 @@ public interface TimeseriesService {
37 37 ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
38 38
39 39 ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
  40 +
  41 + ListenableFuture<List<Void>> remove(EntityId entityId, List<TsKvQuery> queries);
40 42 }
... ...
... ... @@ -23,6 +23,8 @@ import java.util.ArrayList;
23 23 import java.util.List;
24 24 import java.util.UUID;
25 25
  26 +import static org.thingsboard.server.dao.timeseries.CassandraBaseTimeseriesDao.DESC_ORDER;
  27 +
26 28 /**
27 29 * Created by ashvayka on 21.02.17.
28 30 */
... ... @@ -40,6 +42,8 @@ public class TsKvQueryCursor {
40 42 private final List<Long> partitions;
41 43 @Getter
42 44 private final List<TsKvEntry> data;
  45 + @Getter
  46 + private String orderBy;
43 47
44 48 private int partitionIndex;
45 49 private int currentLimit;
... ... @@ -51,13 +55,14 @@ public class TsKvQueryCursor {
51 55 this.startTs = baseQuery.getStartTs();
52 56 this.endTs = baseQuery.getEndTs();
53 57 this.partitions = partitions;
54   - this.partitionIndex = partitions.size() - 1;
  58 + this.orderBy = baseQuery.getOrderBy();
  59 + this.partitionIndex = isDesc() ? partitions.size() - 1 : 0;
55 60 this.data = new ArrayList<>();
56 61 this.currentLimit = baseQuery.getLimit();
57 62 }
58 63
59 64 public boolean hasNextPartition() {
60   - return partitionIndex >= 0;
  65 + return isDesc() ? partitionIndex >= 0 : partitionIndex <= partitions.size() - 1;
61 66 }
62 67
63 68 public boolean isFull() {
... ... @@ -66,7 +71,11 @@ public class TsKvQueryCursor {
66 71
67 72 public long getNextPartition() {
68 73 long partition = partitions.get(partitionIndex);
69   - partitionIndex--;
  74 + if (isDesc()) {
  75 + partitionIndex--;
  76 + } else {
  77 + partitionIndex++;
  78 + }
70 79 return partition;
71 80 }
72 81
... ... @@ -78,4 +87,8 @@ public class TsKvQueryCursor {
78 87 currentLimit -= newData.size();
79 88 data.addAll(newData);
80 89 }
  90 +
  91 + private boolean isDesc() {
  92 + return orderBy.equals(DESC_ORDER);
  93 + }
81 94 }
... ...
... ... @@ -53,6 +53,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
53 53 private static final String BOOLEAN_KEY = "booleanKey";
54 54
55 55 private static final long TS = 42L;
  56 + private static final String DESC_ORDER = "DESC";
56 57
57 58 KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
58 59 KvEntry longKvEntry = new LongDataEntry(LONG_KEY, Long.MAX_VALUE);
... ... @@ -101,6 +102,28 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
101 102 }
102 103
103 104 @Test
  105 + public void testDeleteDeviceTsData() throws Exception {
  106 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  107 +
  108 + saveEntries(deviceId, 10000);
  109 + saveEntries(deviceId, 20000);
  110 + saveEntries(deviceId, 30000);
  111 + saveEntries(deviceId, 40000);
  112 +
  113 + tsService.remove(deviceId, Collections.singletonList(
  114 + new BaseTsKvQuery(STRING_KEY, 15000, 45000, 10000, 0, Aggregation.NONE, DESC_ORDER,
  115 + false))).get();
  116 +
  117 + List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
  118 + new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER,
  119 + false))).get();
  120 + Assert.assertEquals(1, list.size());
  121 +
  122 + List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
  123 + Assert.assertEquals(null, latest.get(0).getValueAsString());
  124 + }
  125 +
  126 + @Test
104 127 public void testFindDeviceTsData() throws Exception {
105 128 DeviceId deviceId = new DeviceId(UUIDs.timeBased());
106 129 List<TsKvEntry> entries = new ArrayList<>();
... ... @@ -115,7 +138,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
115 138 entries.add(save(deviceId, 55000, 600));
116 139
117 140 List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
118   - 60000, 20000, 3, Aggregation.NONE))).get();
  141 + 60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
119 142 assertEquals(3, list.size());
120 143 assertEquals(55000, list.get(0).getTs());
121 144 assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
... ... @@ -127,7 +150,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
127 150 assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
128 151
129 152 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
130   - 60000, 20000, 3, Aggregation.AVG))).get();
  153 + 60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
131 154 assertEquals(3, list.size());
132 155 assertEquals(10000, list.get(0).getTs());
133 156 assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
... ... @@ -139,7 +162,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
139 162 assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
140 163
141 164 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
142   - 60000, 20000, 3, Aggregation.SUM))).get();
  165 + 60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
143 166
144 167 assertEquals(3, list.size());
145 168 assertEquals(10000, list.get(0).getTs());
... ... @@ -152,7 +175,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
152 175 assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
153 176
154 177 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
155   - 60000, 20000, 3, Aggregation.MIN))).get();
  178 + 60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
156 179
157 180 assertEquals(3, list.size());
158 181 assertEquals(10000, list.get(0).getTs());
... ... @@ -165,7 +188,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
165 188 assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
166 189
167 190 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
168   - 60000, 20000, 3, Aggregation.MAX))).get();
  191 + 60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
169 192
170 193 assertEquals(3, list.size());
171 194 assertEquals(10000, list.get(0).getTs());
... ... @@ -178,7 +201,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
178 201 assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
179 202
180 203 list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
181   - 60000, 20000, 3, Aggregation.COUNT))).get();
  204 + 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
182 205
183 206 assertEquals(3, list.size());
184 207 assertEquals(10000, list.get(0).getTs());
... ...