Commit 75b77f09f8e4e2535988adc1309da24bfbb5c1ef

Authored by Andrew Shvayka
Committed by GitHub
2 parents 68c7f8a9 7f01a980

Merge pull request #1014 from davidgin/fixedPartitions

address issue  Add partition granularity to time series data #1006  h…
@@ -205,7 +205,7 @@ cassandra: @@ -205,7 +205,7 @@ cassandra:
205 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}" 205 read_consistency_level: "${CASSANDRA_READ_CONSISTENCY_LEVEL:ONE}"
206 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}" 206 write_consistency_level: "${CASSANDRA_WRITE_CONSISTENCY_LEVEL:ONE}"
207 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}" 207 default_fetch_size: "${CASSANDRA_DEFAULT_FETCH_SIZE:2000}"
208 - # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 208 + # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS,INDEFINITE
209 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 209 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
210 ts_key_value_ttl: "${TS_KV_TTL:0}" 210 ts_key_value_ttl: "${TS_KV_TTL:0}"
211 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" 211 buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
@@ -54,10 +54,11 @@ import javax.annotation.PreDestroy; @@ -54,10 +54,11 @@ import javax.annotation.PreDestroy;
54 import java.time.Instant; 54 import java.time.Instant;
55 import java.time.LocalDateTime; 55 import java.time.LocalDateTime;
56 import java.time.ZoneOffset; 56 import java.time.ZoneOffset;
57 -import java.util.ArrayList;  
58 -import java.util.Collections; 57 +import java.util.Arrays;
59 import java.util.List; 58 import java.util.List;
  59 +import java.util.ArrayList;
60 import java.util.Optional; 60 import java.util.Optional;
  61 +import java.util.Collections;
61 import java.util.stream.Collectors; 62 import java.util.stream.Collectors;
62 63
63 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; 64 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -76,6 +77,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -76,6 +77,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
76 public static final String SELECT_PREFIX = "SELECT "; 77 public static final String SELECT_PREFIX = "SELECT ";
77 public static final String EQUALS_PARAM = " = ? "; 78 public static final String EQUALS_PARAM = " = ? ";
78 79
  80 +
  81 + private static List<Long> FIXED_PARTITION = Arrays.asList(new Long[]{0L});
  82 +
79 @Autowired 83 @Autowired
80 private Environment environment; 84 private Environment environment;
81 85
@@ -163,14 +167,25 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -163,14 +167,25 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
163 } 167 }
164 } 168 }
165 169
  170 + public boolean isFixedPartitioning() {
  171 + return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START);
  172 + }
  173 +
  174 + private ListenableFuture<List<Long>> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) {
  175 + if (isFixedPartitioning()) { //no need to fetch partitions from DB
  176 + return Futures.immediateFuture(FIXED_PARTITION);
  177 + }
  178 + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
  179 + return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  180 + }
  181 +
166 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { 182 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
  183 +
167 long minPartition = toPartitionTs(query.getStartTs()); 184 long minPartition = toPartitionTs(query.getStartTs());
168 long maxPartition = toPartitionTs(query.getEndTs()); 185 long maxPartition = toPartitionTs(query.getEndTs());
169 186
170 - ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);  
171 - 187 + final ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
172 final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>(); 188 final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
173 - final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);  
174 189
175 Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() { 190 Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
176 @Override 191 @Override
@@ -181,7 +196,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -181,7 +196,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
181 196
182 @Override 197 @Override
183 public void onFailure(Throwable t) { 198 public void onFailure(Throwable t) {
184 - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); 199 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), toPartitionTs(query.getStartTs()), toPartitionTs(query.getEndTs()), t);
185 } 200 }
186 }, readResultsProcessingExecutor); 201 }, readResultsProcessingExecutor);
187 202
@@ -229,10 +244,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -229,10 +244,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
229 final long endTs = query.getEndTs(); 244 final long endTs = query.getEndTs();
230 final long ts = startTs + (endTs - startTs) / 2; 245 final long ts = startTs + (endTs - startTs) / 2;
231 246
232 - ResultSetFuture partitionsFuture = fetchPartitions(entityId, key, minPartition, maxPartition);  
233 -  
234 - ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);  
235 247
  248 + ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition);
236 ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture, 249 ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
237 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); 250 getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
238 251
@@ -308,6 +321,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -308,6 +321,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
308 321
309 @Override 322 @Override
310 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) { 323 public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
  324 + if (isFixedPartitioning()) {
  325 + return Futures.immediateFuture(null);
  326 + }
311 ttl = computeTtl(ttl); 327 ttl = computeTtl(ttl);
312 long partition = toPartitionTs(tsKvEntryTs); 328 long partition = toPartitionTs(tsKvEntryTs);
313 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); 329 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
@@ -16,22 +16,25 @@ @@ -16,22 +16,25 @@
16 package org.thingsboard.server.dao.timeseries; 16 package org.thingsboard.server.dao.timeseries;
17 17
18 import java.time.LocalDateTime; 18 import java.time.LocalDateTime;
  19 +import java.time.ZoneOffset;
19 import java.time.temporal.ChronoUnit; 20 import java.time.temporal.ChronoUnit;
20 import java.time.temporal.TemporalUnit; 21 import java.time.temporal.TemporalUnit;
21 import java.util.Optional; 22 import java.util.Optional;
22 23
23 public enum TsPartitionDate { 24 public enum TsPartitionDate {
24 25
25 - MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS); 26 + MINUTES("yyyy-MM-dd-HH-mm", ChronoUnit.MINUTES), HOURS("yyyy-MM-dd-HH", ChronoUnit.HOURS), DAYS("yyyy-MM-dd", ChronoUnit.DAYS), MONTHS("yyyy-MM", ChronoUnit.MONTHS), YEARS("yyyy", ChronoUnit.YEARS),INDEFINITE("",ChronoUnit.FOREVER);
26 27
27 private final String pattern; 28 private final String pattern;
28 private final transient TemporalUnit truncateUnit; 29 private final transient TemporalUnit truncateUnit;
  30 + public final static LocalDateTime EPOCH_START = LocalDateTime.ofEpochSecond(0,0, ZoneOffset.UTC);
29 31
30 TsPartitionDate(String pattern, TemporalUnit truncateUnit) { 32 TsPartitionDate(String pattern, TemporalUnit truncateUnit) {
31 this.pattern = pattern; 33 this.pattern = pattern;
32 this.truncateUnit = truncateUnit; 34 this.truncateUnit = truncateUnit;
33 } 35 }
34 36
  37 +
35 public String getPattern() { 38 public String getPattern() {
36 return pattern; 39 return pattern;
37 } 40 }
@@ -46,6 +49,8 @@ public enum TsPartitionDate { @@ -46,6 +49,8 @@ public enum TsPartitionDate {
46 return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1); 49 return time.truncatedTo(ChronoUnit.DAYS).withDayOfMonth(1);
47 case YEARS: 50 case YEARS:
48 return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1); 51 return time.truncatedTo(ChronoUnit.DAYS).withDayOfYear(1);
  52 + case INDEFINITE:
  53 + return EPOCH_START;
49 default: 54 default:
50 return time.truncatedTo(truncateUnit); 55 return time.truncatedTo(truncateUnit);
51 } 56 }