...
|
...
|
@@ -94,6 +94,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
94
|
94
|
@Value("${cassandra.query.ts_key_value_ttl}")
|
95
|
95
|
private long systemTtl;
|
96
|
96
|
|
|
97
|
+ @Value("${cassandra.query.set_null_values_enabled}")
|
|
98
|
+ private boolean setNullValuesEnabled;
|
|
99
|
+
|
97
|
100
|
private TsPartitionDate tsFormat;
|
98
|
101
|
|
99
|
102
|
private PreparedStatement partitionInsertStmt;
|
...
|
...
|
@@ -307,9 +310,13 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
307
|
310
|
|
308
|
311
|
@Override
|
309
|
312
|
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
|
313
|
+ List<ListenableFuture<Void>> futures = new ArrayList<>();
|
310
|
314
|
ttl = computeTtl(ttl);
|
311
|
315
|
long partition = toPartitionTs(tsKvEntry.getTs());
|
312
|
316
|
DataType type = tsKvEntry.getDataType();
|
|
317
|
+ if (setNullValuesEnabled) {
|
|
318
|
+ processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type);
|
|
319
|
+ }
|
313
|
320
|
BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
|
314
|
321
|
stmt.setString(0, entityId.getEntityType().name())
|
315
|
322
|
.setUUID(1, entityId.getId())
|
...
|
...
|
@@ -320,6 +327,46 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem |
320
|
327
|
if (ttl > 0) {
|
321
|
328
|
stmt.setInt(6, (int) ttl);
|
322
|
329
|
}
|
|
330
|
+ futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null));
|
|
331
|
+ return Futures.transform(Futures.allAsList(futures), result -> null);
|
|
332
|
+ }
|
|
333
|
+
|
|
334
|
+ private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List<ListenableFuture<Void>> futures, long partition, DataType type) {
|
|
335
|
+ switch (type) {
|
|
336
|
+ case LONG:
|
|
337
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
|
|
338
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
|
|
339
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
|
|
340
|
+ break;
|
|
341
|
+ case BOOLEAN:
|
|
342
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
|
|
343
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
|
|
344
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
|
|
345
|
+ break;
|
|
346
|
+ case DOUBLE:
|
|
347
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
|
|
348
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
|
|
349
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING));
|
|
350
|
+ break;
|
|
351
|
+ case STRING:
|
|
352
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN));
|
|
353
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE));
|
|
354
|
+ futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG));
|
|
355
|
+ break;
|
|
356
|
+ }
|
|
357
|
+ }
|
|
358
|
+
|
|
359
|
+ private ListenableFuture<Void> saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) {
|
|
360
|
+ BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
|
|
361
|
+ stmt.setString(0, entityId.getEntityType().name())
|
|
362
|
+ .setUUID(1, entityId.getId())
|
|
363
|
+ .setString(2, tsKvEntry.getKey())
|
|
364
|
+ .setLong(3, partition)
|
|
365
|
+ .setLong(4, tsKvEntry.getTs());
|
|
366
|
+ stmt.setToNull(getColumnName(type));
|
|
367
|
+ if (ttl > 0) {
|
|
368
|
+ stmt.setInt(6, (int) ttl);
|
|
369
|
+ }
|
323
|
370
|
return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null);
|
324
|
371
|
}
|
325
|
372
|
|
...
|
...
|
|