Commit d5319c9de041c785db4dc787806726c92c9ba61c

Authored by Volodymyr Babak
Committed by Andrew Shvayka
1 parent e27ef59e

Partitions should not be removed by custom TTL

@@ -100,7 +100,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq @@ -100,7 +100,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
100 } 100 }
101 101
102 @Override 102 @Override
103 - public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { 103 + public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) {
104 return Futures.immediateFuture(null); 104 return Futures.immediateFuture(null);
105 } 105 }
106 106
@@ -124,7 +124,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @@ -124,7 +124,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
124 } 124 }
125 125
126 @Override 126 @Override
127 - public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { 127 + public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) {
128 return Futures.immediateFuture(0); 128 return Futures.immediateFuture(0);
129 } 129 }
130 130
@@ -170,7 +170,7 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -170,7 +170,7 @@ public class BaseTimeseriesService implements TimeseriesService {
170 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { 170 if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
171 throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); 171 throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
172 } 172 }
173 - futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl)); 173 + futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
174 futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor())); 174 futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
175 futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); 175 futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
176 } 176 }
@@ -181,11 +181,14 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -181,11 +181,14 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
181 } 181 }
182 182
183 @Override 183 @Override
184 - public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) { 184 + public ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key) {
185 if (isFixedPartitioning()) { 185 if (isFixedPartitioning()) {
186 return Futures.immediateFuture(null); 186 return Futures.immediateFuture(null);
187 } 187 }
188 - ttl = computeTtl(ttl); 188 + // DO NOT apply custom to partition, otherwise short TTL will remove partition too early
  189 + // partitions must remain in the DB forever or be removed only by systemTtl
  190 + // removal of empty partition is too expensive (we need to scan all data keys for this partitions with ALLOW FILTERING)
  191 + long ttl = computeTtl(0);
189 long partition = toPartitionTs(tsKvEntryTs); 192 long partition = toPartitionTs(tsKvEntryTs);
190 if (cassandraTsPartitionsCache == null) { 193 if (cassandraTsPartitionsCache == null) {
191 return doSavePartition(tenantId, entityId, key, ttl, partition); 194 return doSavePartition(tenantId, entityId, key, ttl, partition);
@@ -33,7 +33,7 @@ public interface TimeseriesDao { @@ -33,7 +33,7 @@ public interface TimeseriesDao {
33 33
34 ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl); 34 ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl);
35 35
36 - ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl); 36 + ListenableFuture<Integer> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key);
37 37
38 ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); 38 ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
39 39
@@ -100,10 +100,10 @@ public class CassandraPartitionsCacheTest { @@ -100,10 +100,10 @@ public class CassandraPartitionsCacheTest {
100 long tsKvEntryTs = System.currentTimeMillis(); 100 long tsKvEntryTs = System.currentTimeMillis();
101 101
102 for (int i = 0; i < 50000; i++) { 102 for (int i = 0; i < 50000; i++) {
103 - cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); 103 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i);
104 } 104 }
105 for (int i = 0; i < 60000; i++) { 105 for (int i = 0; i < 60000; i++) {
106 - cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i, 0); 106 + cassandraBaseTimeseriesDao.savePartition(tenantId, tenantId, tsKvEntryTs, "test" + i);
107 } 107 }
108 verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class)); 108 verify(cassandraBaseTimeseriesDao, times(60000)).executeAsyncWrite(any(TenantId.class), any(Statement.class));
109 } 109 }