Commit 4486d07f6d4f155261ba29531d98035db2f28eb3

Authored by Volodymyr Babak
1 parent 08f7e48f

Get rid of cassandra specific classes in timeseries dao and timeseries service interfaces

@@ -16,9 +16,6 @@ @@ -16,9 +16,6 @@
16 package org.thingsboard.server.actors.plugin; 16 package org.thingsboard.server.actors.plugin;
17 17
18 import akka.actor.ActorRef; 18 import akka.actor.ActorRef;
19 -import com.datastax.driver.core.ResultSet;  
20 -import com.datastax.driver.core.ResultSetFuture;  
21 -import com.datastax.driver.core.Row;  
22 import com.google.common.base.Function; 19 import com.google.common.base.Function;
23 import com.google.common.util.concurrent.FutureCallback; 20 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.Futures;
@@ -159,7 +156,7 @@ public final class PluginProcessingContext implements PluginContext { @@ -159,7 +156,7 @@ public final class PluginProcessingContext implements PluginContext {
159 @Override 156 @Override
160 public void saveTsData(final EntityId entityId, final TsKvEntry entry, final PluginCallback<Void> callback) { 157 public void saveTsData(final EntityId entityId, final TsKvEntry entry, final PluginCallback<Void> callback) {
161 validate(entityId, new ValidationCallback(callback, ctx -> { 158 validate(entityId, new ValidationCallback(callback, ctx -> {
162 - ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entry); 159 + ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entry);
163 Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); 160 Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
164 })); 161 }));
165 } 162 }
@@ -172,7 +169,7 @@ public final class PluginProcessingContext implements PluginContext { @@ -172,7 +169,7 @@ public final class PluginProcessingContext implements PluginContext {
172 @Override 169 @Override
173 public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) { 170 public void saveTsData(final EntityId entityId, final List<TsKvEntry> entries, long ttl, final PluginCallback<Void> callback) {
174 validate(entityId, new ValidationCallback(callback, ctx -> { 171 validate(entityId, new ValidationCallback(callback, ctx -> {
175 - ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl); 172 + ListenableFuture<List<Void>> rsListFuture = pluginCtx.tsService.save(entityId, entries, ttl);
176 Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor); 173 Futures.addCallback(rsListFuture, getListCallback(callback, v -> null), executor);
177 })); 174 }));
178 } 175 }
@@ -189,26 +186,16 @@ public final class PluginProcessingContext implements PluginContext { @@ -189,26 +186,16 @@ public final class PluginProcessingContext implements PluginContext {
189 @Override 186 @Override
190 public void loadLatestTimeseries(final EntityId entityId, final PluginCallback<List<TsKvEntry>> callback) { 187 public void loadLatestTimeseries(final EntityId entityId, final PluginCallback<List<TsKvEntry>> callback) {
191 validate(entityId, new ValidationCallback(callback, ctx -> { 188 validate(entityId, new ValidationCallback(callback, ctx -> {
192 - ResultSetFuture future = pluginCtx.tsService.findAllLatest(entityId);  
193 - Futures.addCallback(future, getCallback(callback, pluginCtx.tsService::convertResultSetToTsKvEntryList), executor); 189 + ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAllLatest(entityId);
  190 + Futures.addCallback(future, getCallback(callback, v -> v), executor);
194 })); 191 }));
195 } 192 }
196 193
197 @Override 194 @Override
198 public void loadLatestTimeseries(final EntityId entityId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) { 195 public void loadLatestTimeseries(final EntityId entityId, final Collection<String> keys, final PluginCallback<List<TsKvEntry>> callback) {
199 validate(entityId, new ValidationCallback(callback, ctx -> { 196 validate(entityId, new ValidationCallback(callback, ctx -> {
200 - ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);  
201 - Futures.addCallback(rsListFuture, getListCallback(callback, rsList ->  
202 - {  
203 - List<TsKvEntry> result = new ArrayList<>();  
204 - for (ResultSet rs : rsList) {  
205 - Row row = rs.one();  
206 - if (row != null) {  
207 - result.add(pluginCtx.tsService.convertResultToTsKvEntry(row));  
208 - }  
209 - }  
210 - return result;  
211 - }), executor); 197 + ListenableFuture<List<TsKvEntry>> rsListFuture = pluginCtx.tsService.findLatest(entityId, keys);
  198 + Futures.addCallback(rsListFuture, getCallback(callback, v -> v), executor);
212 })); 199 }));
213 } 200 }
214 201
@@ -153,7 +153,7 @@ cassandra: @@ -153,7 +153,7 @@ cassandra:
153 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS 153 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
154 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" 154 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
155 # Specify max data points per request 155 # Specify max data points per request
156 - min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}" 156 + min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:1000}"
157 157
158 # Actor system parameters 158 # Actor system parameters
159 actors: 159 actors:
@@ -188,7 +188,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati @@ -188,7 +188,7 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
188 QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY) 188 QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY)
189 ), 189 ),
190 pageLink, ModelConstants.RELATION_TO_ID_PROPERTY); 190 pageLink, ModelConstants.RELATION_TO_ID_PROPERTY);
191 - return getFuture(executeAsyncRead(query), rs -> getEntityRelations(rs)); 191 + return getFuture(executeAsyncRead(query), this::getEntityRelations);
192 } 192 }
193 193
194 private PreparedStatement getSaveStmt() { 194 private PreparedStatement getSaveStmt() {
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sql.timeseries; 16 package org.thingsboard.server.dao.sql.timeseries;
17 17
18 -import com.datastax.driver.core.ResultSetFuture;  
19 -import com.datastax.driver.core.Row;  
20 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
21 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -44,37 +42,28 @@ public class JpaTimeseriesDao implements TimeseriesDao { @@ -44,37 +42,28 @@ public class JpaTimeseriesDao implements TimeseriesDao {
44 } 42 }
45 43
46 @Override 44 @Override
47 - public ResultSetFuture findLatest(EntityId entityId, String key) { 45 + public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
48 return null; 46 return null;
49 } 47 }
50 48
51 @Override 49 @Override
52 - public ResultSetFuture findAllLatest(EntityId entityId) { 50 + public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
53 return null; 51 return null;
54 } 52 }
55 53
56 @Override 54 @Override
57 - public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) { 55 + public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
58 return null; 56 return null;
59 } 57 }
60 58
61 @Override 59 @Override
62 - public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) { 60 + public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
63 return null; 61 return null;
64 } 62 }
65 63
66 @Override 64 @Override
67 - public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 65 + public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
68 return null; 66 return null;
69 } 67 }
70 68
71 - @Override  
72 - public TsKvEntry convertResultToTsKvEntry(Row row) {  
73 - return null;  
74 - }  
75 -  
76 - @Override  
77 - public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {  
78 - return null;  
79 - }  
80 } 69 }
@@ -15,9 +15,6 @@ @@ -15,9 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.dao.timeseries; 16 package org.thingsboard.server.dao.timeseries;
17 17
18 -import com.datastax.driver.core.ResultSet;  
19 -import com.datastax.driver.core.ResultSetFuture;  
20 -import com.datastax.driver.core.Row;  
21 import com.google.common.collect.Lists; 18 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.Futures; 19 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +29,6 @@ import org.thingsboard.server.dao.service.Validator; @@ -32,7 +29,6 @@ import org.thingsboard.server.dao.service.Validator;
32 29
33 import java.util.Collection; 30 import java.util.Collection;
34 import java.util.List; 31 import java.util.List;
35 -import java.util.UUID;  
36 32
37 import static org.apache.commons.lang3.StringUtils.isBlank; 33 import static org.apache.commons.lang3.StringUtils.isBlank;
38 34
@@ -56,42 +52,35 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -56,42 +52,35 @@ public class BaseTimeseriesService implements TimeseriesService {
56 } 52 }
57 53
58 @Override 54 @Override
59 - public ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys) { 55 + public ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys) {
60 validate(entityId); 56 validate(entityId);
61 - List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(keys.size()); 57 + List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
62 keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); 58 keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
63 keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key))); 59 keys.forEach(key -> futures.add(timeseriesDao.findLatest(entityId, key)));
64 return Futures.allAsList(futures); 60 return Futures.allAsList(futures);
65 } 61 }
66 62
67 @Override 63 @Override
68 - public ResultSetFuture findAllLatest(EntityId entityId) { 64 + public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
69 validate(entityId); 65 validate(entityId);
70 return timeseriesDao.findAllLatest(entityId); 66 return timeseriesDao.findAllLatest(entityId);
71 } 67 }
72 68
73 @Override 69 @Override
74 - public ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry) { 70 + public ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry) {
75 validate(entityId); 71 validate(entityId);
76 if (tsKvEntry == null) { 72 if (tsKvEntry == null) {
77 throw new IncorrectParameterException("Key value entry can't be null"); 73 throw new IncorrectParameterException("Key value entry can't be null");
78 } 74 }
79 long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs()); 75 long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
80 -  
81 - List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); 76 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
82 saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L); 77 saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
83 return Futures.allAsList(futures); 78 return Futures.allAsList(futures);
84 } 79 }
85 80
86 @Override 81 @Override
87 - public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries) {  
88 - return save(entityId, tsKvEntries, 0L);  
89 - }  
90 -  
91 - @Override  
92 - public ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {  
93 - validate(entityId);  
94 - List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); 82 + public ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
  83 + List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY);
95 for (TsKvEntry tsKvEntry : tsKvEntries) { 84 for (TsKvEntry tsKvEntry : tsKvEntries) {
96 if (tsKvEntry == null) { 85 if (tsKvEntry == null) {
97 throw new IncorrectParameterException("Key value entry can't be null"); 86 throw new IncorrectParameterException("Key value entry can't be null");
@@ -102,18 +91,7 @@ public class BaseTimeseriesService implements TimeseriesService { @@ -102,18 +91,7 @@ public class BaseTimeseriesService implements TimeseriesService {
102 return Futures.allAsList(futures); 91 return Futures.allAsList(futures);
103 } 92 }
104 93
105 -  
106 - @Override  
107 - public TsKvEntry convertResultToTsKvEntry(Row row) {  
108 - return timeseriesDao.convertResultToTsKvEntry(row);  
109 - }  
110 -  
111 - @Override  
112 - public List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs) {  
113 - return timeseriesDao.convertResultToTsKvEntryList(rs.all());  
114 - }  
115 -  
116 - private void saveAndRegisterFutures(List<ResultSetFuture> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) { 94 + private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
117 futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl)); 95 futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
118 futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry)); 96 futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
119 futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl)); 97 futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
@@ -51,12 +51,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -51,12 +51,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
51 */ 51 */
52 @Component 52 @Component
53 @Slf4j 53 @Slf4j
54 -@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true", matchIfMissing = false) 54 +@ConditionalOnProperty(prefix = "cassandra", value = "enabled", havingValue = "true")
55 public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implements TimeseriesDao { 55 public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implements TimeseriesDao {
56 56
57 - //@Value("${cassandra.query.min_aggregation_step_ms}")  
58 - //TODO:  
59 - private int minAggregationStepMs = 1000; 57 + @Value("${cassandra.query.min_aggregation_step_ms}")
  58 + private int minAggregationStepMs;
60 59
61 @Value("${cassandra.query.ts_key_value_partitioning}") 60 @Value("${cassandra.query.ts_key_value_partitioning}")
62 private String partitioning; 61 private String partitioning;
@@ -103,9 +102,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -103,9 +102,12 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
103 @Nullable 102 @Nullable
104 @Override 103 @Override
105 public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) { 104 public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
106 - List<TsKvEntry> result = new ArrayList<TsKvEntry>();  
107 - results.forEach(r -> result.addAll(r));  
108 - return result; 105 + if (results == null || results.isEmpty()) {
  106 + return null;
  107 + }
  108 + return results.stream()
  109 + .flatMap(List::stream)
  110 + .collect(Collectors.toList());
109 } 111 }
110 }, readResultsProcessingExecutor); 112 }, readResultsProcessingExecutor);
111 } 113 }
@@ -238,26 +240,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -238,26 +240,26 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
238 } 240 }
239 241
240 @Override 242 @Override
241 - public ResultSetFuture findLatest(EntityId entityId, String key) { 243 + public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
242 BoundStatement stmt = getFindLatestStmt().bind(); 244 BoundStatement stmt = getFindLatestStmt().bind();
243 stmt.setString(0, entityId.getEntityType().name()); 245 stmt.setString(0, entityId.getEntityType().name());
244 stmt.setUUID(1, entityId.getId()); 246 stmt.setUUID(1, entityId.getId());
245 stmt.setString(2, key); 247 stmt.setString(2, key);
246 log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId()); 248 log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
247 - return executeAsyncRead(stmt); 249 + return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntry(rs.one()));
248 } 250 }
249 251
250 @Override 252 @Override
251 - public ResultSetFuture findAllLatest(EntityId entityId) { 253 + public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
252 BoundStatement stmt = getFindAllLatestStmt().bind(); 254 BoundStatement stmt = getFindAllLatestStmt().bind();
253 stmt.setString(0, entityId.getEntityType().name()); 255 stmt.setString(0, entityId.getEntityType().name());
254 stmt.setUUID(1, entityId.getId()); 256 stmt.setUUID(1, entityId.getId());
255 log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId()); 257 log.debug("Generated query [{}] for entityType {} and entityId {}", stmt, entityId.getEntityType(), entityId.getId());
256 - return executeAsyncRead(stmt); 258 + return getFuture(executeAsyncRead(stmt), rs -> convertResultToTsKvEntryList(rs.all()));
257 } 259 }
258 260
259 @Override 261 @Override
260 - public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) { 262 + public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
261 DataType type = tsKvEntry.getDataType(); 263 DataType type = tsKvEntry.getDataType();
262 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); 264 BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
263 stmt.setString(0, entityId.getEntityType().name()) 265 stmt.setString(0, entityId.getEntityType().name())
@@ -269,11 +271,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -269,11 +271,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
269 if (ttl > 0) { 271 if (ttl > 0) {
270 stmt.setInt(6, (int) ttl); 272 stmt.setInt(6, (int) ttl);
271 } 273 }
272 - return executeAsyncWrite(stmt); 274 + return getFuture(executeAsyncWrite(stmt), rs -> null);
273 } 275 }
274 276
275 @Override 277 @Override
276 - public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) { 278 + public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
277 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key); 279 log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
278 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind(); 280 BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
279 stmt = stmt.setString(0, entityId.getEntityType().name()) 281 stmt = stmt.setString(0, entityId.getEntityType().name())
@@ -283,11 +285,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -283,11 +285,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
283 if (ttl > 0) { 285 if (ttl > 0) {
284 stmt.setInt(4, (int) ttl); 286 stmt.setInt(4, (int) ttl);
285 } 287 }
286 - return executeAsyncWrite(stmt); 288 + return getFuture(executeAsyncWrite(stmt), rs -> null);
287 } 289 }
288 290
289 @Override 291 @Override
290 - public ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry) { 292 + public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
291 DataType type = tsKvEntry.getDataType(); 293 DataType type = tsKvEntry.getDataType();
292 BoundStatement stmt = getLatestStmt(type).bind() 294 BoundStatement stmt = getLatestStmt(type).bind()
293 .setString(0, entityId.getEntityType().name()) 295 .setString(0, entityId.getEntityType().name())
@@ -295,25 +297,18 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -295,25 +297,18 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
295 .setString(2, tsKvEntry.getKey()) 297 .setString(2, tsKvEntry.getKey())
296 .setLong(3, tsKvEntry.getTs()); 298 .setLong(3, tsKvEntry.getTs());
297 addValue(tsKvEntry, stmt, 4); 299 addValue(tsKvEntry, stmt, 4);
298 - return executeAsyncWrite(stmt); 300 + return getFuture(executeAsyncWrite(stmt), rs -> null);
299 } 301 }
300 302
301 - @Override  
302 - public List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) { 303 + private List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows) {
303 List<TsKvEntry> entries = new ArrayList<>(rows.size()); 304 List<TsKvEntry> entries = new ArrayList<>(rows.size());
304 if (!rows.isEmpty()) { 305 if (!rows.isEmpty()) {
305 - rows.forEach(row -> {  
306 - TsKvEntry kvEntry = convertResultToTsKvEntry(row);  
307 - if (kvEntry != null) {  
308 - entries.add(kvEntry);  
309 - }  
310 - }); 306 + rows.forEach(row -> entries.add(convertResultToTsKvEntry(row)));
311 } 307 }
312 return entries; 308 return entries;
313 } 309 }
314 310
315 - @Override  
316 - public TsKvEntry convertResultToTsKvEntry(Row row) { 311 + private TsKvEntry convertResultToTsKvEntry(Row row) {
317 String key = row.getString(ModelConstants.KEY_COLUMN); 312 String key = row.getString(ModelConstants.KEY_COLUMN);
318 long ts = row.getLong(ModelConstants.TS_COLUMN); 313 long ts = row.getLong(ModelConstants.TS_COLUMN);
319 return new BasicTsKvEntry(ts, toKvEntry(row, key)); 314 return new BasicTsKvEntry(ts, toKvEntry(row, key));
@@ -490,7 +485,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -490,7 +485,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
490 return findAllLatestStmt; 485 return findAllLatestStmt;
491 } 486 }
492 487
493 - public static String getColumnName(DataType type) { 488 + private static String getColumnName(DataType type) {
494 switch (type) { 489 switch (type) {
495 case BOOLEAN: 490 case BOOLEAN:
496 return ModelConstants.BOOLEAN_VALUE_COLUMN; 491 return ModelConstants.BOOLEAN_VALUE_COLUMN;
@@ -505,7 +500,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @@ -505,7 +500,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
505 } 500 }
506 } 501 }
507 502
508 - public static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) { 503 + private static void addValue(KvEntry kvEntry, BoundStatement stmt, int column) {
509 switch (kvEntry.getDataType()) { 504 switch (kvEntry.getDataType()) {
510 case BOOLEAN: 505 case BOOLEAN:
511 stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue()); 506 stmt.setBool(column, kvEntry.getBooleanValue().get().booleanValue());
@@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
15 */ 15 */
16 package org.thingsboard.server.dao.timeseries; 16 package org.thingsboard.server.dao.timeseries;
17 17
18 -import com.datastax.driver.core.ResultSetFuture;  
19 -import com.datastax.driver.core.Row;  
20 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
21 import org.thingsboard.server.common.data.id.EntityId; 19 import org.thingsboard.server.common.data.id.EntityId;
22 import org.thingsboard.server.common.data.kv.TsKvEntry; 20 import org.thingsboard.server.common.data.kv.TsKvEntry;
@@ -33,18 +31,13 @@ public interface TimeseriesDao { @@ -33,18 +31,13 @@ public interface TimeseriesDao {
33 31
34 ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries); 32 ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
35 33
36 - ResultSetFuture findLatest(EntityId entityId, String key); 34 + ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
37 35
38 - ResultSetFuture findAllLatest(EntityId entityId); 36 + ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
39 37
40 - ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl); 38 + ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
41 39
42 - ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl);  
43 -  
44 - ResultSetFuture saveLatest(EntityId entityId, TsKvEntry tsKvEntry);  
45 -  
46 - TsKvEntry convertResultToTsKvEntry(Row row);  
47 -  
48 - List<TsKvEntry> convertResultToTsKvEntryList(List<Row> rows); 40 + ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl);
49 41
  42 + ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
50 } 43 }
@@ -15,12 +15,8 @@ @@ -15,12 +15,8 @@
15 */ 15 */
16 package org.thingsboard.server.dao.timeseries; 16 package org.thingsboard.server.dao.timeseries;
17 17
18 -import com.datastax.driver.core.ResultSet;  
19 -import com.datastax.driver.core.ResultSetFuture;  
20 -import com.datastax.driver.core.Row;  
21 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
22 import org.thingsboard.server.common.data.id.EntityId; 19 import org.thingsboard.server.common.data.id.EntityId;
23 -import org.thingsboard.server.common.data.id.UUIDBased;  
24 import org.thingsboard.server.common.data.kv.TsKvEntry; 20 import org.thingsboard.server.common.data.kv.TsKvEntry;
25 import org.thingsboard.server.common.data.kv.TsKvQuery; 21 import org.thingsboard.server.common.data.kv.TsKvQuery;
26 22
@@ -34,18 +30,11 @@ public interface TimeseriesService { @@ -34,18 +30,11 @@ public interface TimeseriesService {
34 30
35 ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries); 31 ListenableFuture<List<TsKvEntry>> findAll(EntityId entityId, List<TsKvQuery> queries);
36 32
37 - ListenableFuture<List<ResultSet>> findLatest(EntityId entityId, Collection<String> keys); 33 + ListenableFuture<List<TsKvEntry>> findLatest(EntityId entityId, Collection<String> keys);
38 34
39 - ResultSetFuture findAllLatest(EntityId entityId); 35 + ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
40 36
41 - ListenableFuture<List<ResultSet>> save(EntityId entityId, TsKvEntry tsKvEntry);  
42 -  
43 - ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry);  
44 -  
45 - ListenableFuture<List<ResultSet>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);  
46 -  
47 - TsKvEntry convertResultToTsKvEntry(Row row);  
48 -  
49 - List<TsKvEntry> convertResultSetToTsKvEntryList(ResultSet rs); 37 + ListenableFuture<List<Void>> save(EntityId entityId, TsKvEntry tsKvEntry);
50 38
  39 + ListenableFuture<List<Void>> save(EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
51 } 40 }
@@ -15,13 +15,10 @@ @@ -15,13 +15,10 @@
15 */ 15 */
16 package org.thingsboard.server.dao.timeseries; 16 package org.thingsboard.server.dao.timeseries;
17 17
18 -import com.datastax.driver.core.ResultSet;  
19 -import com.datastax.driver.core.ResultSetFuture;  
20 import com.datastax.driver.core.utils.UUIDs; 18 import com.datastax.driver.core.utils.UUIDs;
21 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
22 import org.junit.Assert; 20 import org.junit.Assert;
23 import org.junit.Test; 21 import org.junit.Test;
24 -import org.thingsboard.server.common.data.DataConstants;  
25 import org.thingsboard.server.common.data.id.DeviceId; 22 import org.thingsboard.server.common.data.id.DeviceId;
26 import org.thingsboard.server.common.data.kv.*; 23 import org.thingsboard.server.common.data.kv.*;
27 import org.thingsboard.server.dao.service.AbstractServiceTest; 24 import org.thingsboard.server.dao.service.AbstractServiceTest;
@@ -62,8 +59,7 @@ public class TimeseriesServiceTest extends AbstractServiceTest { @@ -62,8 +59,7 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
62 saveEntries(deviceId, TS - 1); 59 saveEntries(deviceId, TS - 1);
63 saveEntries(deviceId, TS); 60 saveEntries(deviceId, TS);
64 61
65 - ResultSetFuture rsFuture = tsService.findAllLatest(deviceId);  
66 - List<TsKvEntry> tsList = tsService.convertResultSetToTsKvEntryList(rsFuture.get()); 62 + List<TsKvEntry> tsList = tsService.findAllLatest(deviceId).get();
67 63
68 assertNotNull(tsList); 64 assertNotNull(tsList);
69 assertEquals(4, tsList.size()); 65 assertEquals(4, tsList.size());
@@ -91,9 +87,9 @@ public class TimeseriesServiceTest extends AbstractServiceTest { @@ -91,9 +87,9 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
91 saveEntries(deviceId, TS - 1); 87 saveEntries(deviceId, TS - 1);
92 saveEntries(deviceId, TS); 88 saveEntries(deviceId, TS);
93 89
94 - List<ResultSet> rs = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();  
95 - Assert.assertEquals(1, rs.size());  
96 - Assert.assertEquals(toTsEntry(TS, stringKvEntry), tsService.convertResultToTsKvEntry(rs.get(0).one())); 90 + List<TsKvEntry> entries = tsService.findLatest(deviceId, Collections.singleton(STRING_KEY)).get();
  91 + Assert.assertEquals(1, entries.size());
  92 + Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
97 } 93 }
98 94
99 @Test 95 @Test
@@ -48,4 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS @@ -48,4 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS
48 48
49 cassandra.query.max_limit_per_request=1000 49 cassandra.query.max_limit_per_request=1000
50 50
51 -cassandra.query.min_aggregation_step_ms=100  
  51 +cassandra.query.min_aggregation_step_ms=1000