Commit d35d302a5048c8361301abe7d62b822a80a82f30

Authored by Andrew Shvayka
Committed by GitHub
2 parents d1c6a907 bf17acda

Merge pull request #1615 from dmytro-landiak/feature/set-nulls-cassandra

set data types to null cassandra
... ... @@ -173,6 +173,8 @@ cassandra:
173 173 callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
174 174 poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
175 175 rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
  176 + # set all data types values except target to null for the same ts on save
  177 + set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:false}"
176 178 tenant_rate_limits:
177 179 enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
178 180 configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}"
... ...
... ... @@ -94,6 +94,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
94 94 @Value("${cassandra.query.ts_key_value_ttl}")
95 95 private long systemTtl;
96 96
  97 + @Value("${cassandra.query.set_null_values_enabled}")
  98 + private boolean setNullValuesEnabled;
  99 +
97 100 private TsPartitionDate tsFormat;
98 101
99 102 private PreparedStatement partitionInsertStmt;
... ... @@ -307,9 +310,13 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
307 310
308 311 @Override
309 312 public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
  313 + List<ListenableFuture<Void>> futures = new ArrayList<>();
310 314 ttl = computeTtl(ttl);
311 315 long partition = toPartitionTs(tsKvEntry.getTs());
312 316 DataType type = tsKvEntry.getDataType();
  317 + if (setNullValuesEnabled) {
  318 + processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type);
  319 + }
313 320 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
314 321 stmt.setString(0, entityId.getEntityType().name())
315 322 .setUUID(1, entityId.getId())
... ... @@ -320,6 +327,46 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
320 327 if (ttl > 0) {
321 328 stmt.setInt(6, (int) ttl);
322 329 }
  330 + futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null));
  331 + return Futures.transform(Futures.allAsList(futures), result -> null);
  332 + }
  333 +
  334 + private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List<ListenableFuture<Void>> futures, long partition, DataType type) {
  335 + switch (type) {
  336 + case LONG:
  337 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
  338 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
  339 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
  340 + break;
  341 + case BOOLEAN:
  342 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
  343 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
  344 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
  345 + break;
  346 + case DOUBLE:
  347 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
  348 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
  349 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
  350 + break;
  351 + case STRING:
  352 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
  353 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
  354 + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
  355 + break;
  356 + }
  357 + }
  358 +
  359 + private ListenableFuture<Void> saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) {
  360 + BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
  361 + stmt.setString(0, entityId.getEntityType().name())
  362 + .setUUID(1, entityId.getId())
  363 + .setString(2, tsKvEntry.getKey())
  364 + .setLong(3, partition)
  365 + .setLong(4, tsKvEntry.getTs());
  366 + stmt.setToNull(getColumnName(type));
  367 + if (ttl > 0) {
  368 + stmt.setInt(6, (int) ttl);
  369 + }
323 370 return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
324 371 }
325 372
... ...
... ... @@ -53,6 +53,7 @@ cassandra.query.buffer_size=100000
53 53 cassandra.query.concurrent_limit=1000
54 54 cassandra.query.permit_max_wait_time=20000
55 55 cassandra.query.rate_limit_print_interval_ms=30000
  56 +cassandra.query.set_null_values_enabled=false
56 57 cassandra.query.tenant_rate_limits.enabled=false
57 58 cassandra.query.tenant_rate_limits.configuration=5000:1,100000:60
58 59 cassandra.query.tenant_rate_limits.print_tenant_names=false
... ...