Commit 2d235cf9eb14be7fe015db3ebcdb14fcb1dd0cb0

Authored by Igor Kulikov
1 parent 34da0fe8

Improve Cassandra resultSet pagination

... ... @@ -176,9 +176,9 @@ cassandra:
176 176 # Enable/disable secure connection
177 177 ssl: "${CASSANDRA_USE_SSL:false}"
178 178 # Enable/disable JMX
179   - jmx: "${CASSANDRA_USE_JMX:true}"
  179 + jmx: "${CASSANDRA_USE_JMX:false}"
180 180 # Enable/disable metrics collection.
181   - metrics: "${CASSANDRA_USE_METRICS:true}"
  181 + metrics: "${CASSANDRA_USE_METRICS:false}"
182 182 # NONE SNAPPY LZ4
183 183 compression: "${CASSANDRA_COMPRESSION:none}"
184 184 # Specify cassandra cluster initialization timeout in milliseconds (if no hosts available during startup)
... ...
... ... @@ -16,11 +16,16 @@
16 16 package org.thingsboard.server.dao.nosql;
17 17
18 18 import com.datastax.oss.driver.api.core.cql.Statement;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import com.google.common.util.concurrent.MoreExecutors;
19 22 import lombok.Data;
20 23 import org.thingsboard.server.common.data.id.TenantId;
21 24 import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
22 25 import org.thingsboard.server.dao.util.AsyncTask;
23 26
  27 +import java.util.function.Function;
  28 +
24 29 /**
25 30 * Created by ashvayka on 24.10.18.
26 31 */
... ... @@ -31,4 +36,11 @@ public class CassandraStatementTask implements AsyncTask {
31 36 private final GuavaSession session;
32 37 private final Statement statement;
33 38
  39 + public ListenableFuture<TbResultSet> executeAsync(Function<Statement, TbResultSetFuture> executeAsyncFunction) {
  40 + return Futures.transform(session.executeAsync(statement),
  41 + result -> new TbResultSet(statement, result, executeAsyncFunction),
  42 + MoreExecutors.directExecutor()
  43 + );
  44 + }
  45 +
34 46 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.nosql;
  17 +
  18 +import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
  19 +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
  20 +import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
  21 +import com.datastax.oss.driver.api.core.cql.Row;
  22 +import com.datastax.oss.driver.api.core.cql.Statement;
  23 +import com.google.common.collect.Lists;
  24 +import com.google.common.util.concurrent.FutureCallback;
  25 +import com.google.common.util.concurrent.Futures;
  26 +import com.google.common.util.concurrent.ListenableFuture;
  27 +import com.google.common.util.concurrent.MoreExecutors;
  28 +import com.google.common.util.concurrent.SettableFuture;
  29 +import edu.umd.cs.findbugs.annotations.NonNull;
  30 +import org.checkerframework.checker.nullness.qual.Nullable;
  31 +
  32 +import java.nio.ByteBuffer;
  33 +import java.util.ArrayList;
  34 +import java.util.List;
  35 +import java.util.concurrent.CompletionStage;
  36 +import java.util.concurrent.Executor;
  37 +import java.util.function.Function;
  38 +
  39 +public class TbResultSet implements AsyncResultSet {
  40 +
  41 + private final Statement originalStatement;
  42 + private final AsyncResultSet delegate;
  43 + private final Function<Statement, TbResultSetFuture> executeAsyncFunction;
  44 +
  45 + public TbResultSet(Statement originalStatement, AsyncResultSet delegate,
  46 + Function<Statement, TbResultSetFuture> executeAsyncFunction) {
  47 + this.originalStatement = originalStatement;
  48 + this.delegate = delegate;
  49 + this.executeAsyncFunction = executeAsyncFunction;
  50 + }
  51 +
  52 + @NonNull
  53 + @Override
  54 + public ColumnDefinitions getColumnDefinitions() {
  55 + return delegate.getColumnDefinitions();
  56 + }
  57 +
  58 + @NonNull
  59 + @Override
  60 + public ExecutionInfo getExecutionInfo() {
  61 + return delegate.getExecutionInfo();
  62 + }
  63 +
  64 + @Override
  65 + public int remaining() {
  66 + return delegate.remaining();
  67 + }
  68 +
  69 + @NonNull
  70 + @Override
  71 + public Iterable<Row> currentPage() {
  72 + return delegate.currentPage();
  73 + }
  74 +
  75 + @Override
  76 + public boolean hasMorePages() {
  77 + return delegate.hasMorePages();
  78 + }
  79 +
  80 + @NonNull
  81 + @Override
  82 + public CompletionStage<AsyncResultSet> fetchNextPage() throws IllegalStateException {
  83 + return delegate.fetchNextPage();
  84 + }
  85 +
  86 + @Override
  87 + public boolean wasApplied() {
  88 + return delegate.wasApplied();
  89 + }
  90 +
  91 + public ListenableFuture<List<Row>> allRows(Executor executor) {
  92 + List<Row> allRows = new ArrayList<>();
  93 + SettableFuture<List<Row>> resultFuture = SettableFuture.create();
  94 + this.processRows(originalStatement, delegate, allRows, resultFuture, executor);
  95 + return resultFuture;
  96 + }
  97 +
  98 + private void processRows(Statement statement,
  99 + AsyncResultSet resultSet,
  100 + List<Row> allRows,
  101 + SettableFuture<List<Row>> resultFuture,
  102 + Executor executor) {
  103 + allRows.addAll(loadRows(resultSet));
  104 + if (resultSet.hasMorePages()) {
  105 + ByteBuffer nextPagingState = resultSet.getExecutionInfo().getPagingState();
  106 + Statement<?> nextStatement = statement.setPagingState(nextPagingState);
  107 + TbResultSetFuture resultSetFuture = executeAsyncFunction.apply(nextStatement);
  108 + Futures.addCallback(resultSetFuture,
  109 + new FutureCallback<TbResultSet>() {
  110 + @Override
  111 + public void onSuccess(@Nullable TbResultSet result) {
  112 + processRows(nextStatement, result,
  113 + allRows, resultFuture, executor);
  114 + }
  115 +
  116 + @Override
  117 + public void onFailure(Throwable t) {
  118 + resultFuture.setException(t);
  119 + }
  120 + }, executor != null ? executor : MoreExecutors.directExecutor()
  121 + );
  122 + } else {
  123 + resultFuture.set(allRows);
  124 + }
  125 + }
  126 +
  127 + List<Row> loadRows(AsyncResultSet resultSet) {
  128 + return Lists.newArrayList(resultSet.currentPage());
  129 + }
  130 +
  131 +}
... ...
... ... @@ -27,19 +27,19 @@ import java.util.concurrent.TimeoutException;
27 27 /**
28 28 * Created by ashvayka on 24.10.18.
29 29 */
30   -public class TbResultSetFuture implements ListenableFuture<AsyncResultSet> {
  30 +public class TbResultSetFuture implements ListenableFuture<TbResultSet> {
31 31
32   - private final SettableFuture<AsyncResultSet> mainFuture;
  32 + private final SettableFuture<TbResultSet> mainFuture;
33 33
34   - public TbResultSetFuture(SettableFuture<AsyncResultSet> mainFuture) {
  34 + public TbResultSetFuture(SettableFuture<TbResultSet> mainFuture) {
35 35 this.mainFuture = mainFuture;
36 36 }
37 37
38   - public AsyncResultSet getUninterruptibly() {
  38 + public TbResultSet getUninterruptibly() {
39 39 return getSafe();
40 40 }
41 41
42   - public AsyncResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
  42 + public TbResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException {
43 43 return getSafe(timeout, unit);
44 44 }
45 45
... ... @@ -59,12 +59,12 @@ public class TbResultSetFuture implements ListenableFuture<AsyncResultSet> {
59 59 }
60 60
61 61 @Override
62   - public AsyncResultSet get() throws InterruptedException, ExecutionException {
  62 + public TbResultSet get() throws InterruptedException, ExecutionException {
63 63 return mainFuture.get();
64 64 }
65 65
66 66 @Override
67   - public AsyncResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  67 + public TbResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
68 68 return mainFuture.get(timeout, unit);
69 69 }
70 70
... ... @@ -73,7 +73,7 @@ public class TbResultSetFuture implements ListenableFuture<AsyncResultSet> {
73 73 mainFuture.addListener(listener, executor);
74 74 }
75 75
76   - private AsyncResultSet getSafe() {
  76 + private TbResultSet getSafe() {
77 77 try {
78 78 return mainFuture.get();
79 79 } catch (InterruptedException | ExecutionException e) {
... ... @@ -81,7 +81,7 @@ public class TbResultSetFuture implements ListenableFuture<AsyncResultSet> {
81 81 }
82 82 }
83 83
84   - private AsyncResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException {
  84 + private TbResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException {
85 85 try {
86 86 return mainFuture.get(timeout, unit);
87 87 } catch (InterruptedException | ExecutionException e) {
... ...
... ... @@ -52,21 +52,21 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
52 52 }
53 53 }
54 54
55   - protected <T> ListenableFuture<T> getFuture(TbResultSetFuture future, java.util.function.Function<AsyncResultSet, T> transformer) {
56   - return Futures.transform(future, new Function<AsyncResultSet, T>() {
  55 + protected <T> ListenableFuture<T> getFuture(TbResultSetFuture future, java.util.function.Function<TbResultSet, T> transformer) {
  56 + return Futures.transform(future, new Function<TbResultSet, T>() {
57 57 @Nullable
58 58 @Override
59   - public T apply(@Nullable AsyncResultSet input) {
  59 + public T apply(@Nullable TbResultSet input) {
60 60 return transformer.apply(input);
61 61 }
62 62 }, readResultsProcessingExecutor);
63 63 }
64 64
65   - protected <T> ListenableFuture<T> getFutureAsync(TbResultSetFuture future, com.google.common.util.concurrent.AsyncFunction<AsyncResultSet, T> transformer) {
66   - return Futures.transformAsync(future, new AsyncFunction<AsyncResultSet, T>() {
  65 + protected <T> ListenableFuture<T> getFutureAsync(TbResultSetFuture future, com.google.common.util.concurrent.AsyncFunction<TbResultSet, T> transformer) {
  66 + return Futures.transformAsync(future, new AsyncFunction<TbResultSet, T>() {
67 67 @Nullable
68 68 @Override
69   - public ListenableFuture<T> apply(@Nullable AsyncResultSet input) {
  69 + public ListenableFuture<T> apply(@Nullable TbResultSet input) {
70 70 try {
71 71 return transformer.apply(input);
72 72 } catch (Exception e) {
... ... @@ -76,8 +76,4 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
76 76 }, readResultsProcessingExecutor);
77 77 }
78 78
79   - protected ListenableFuture<List<Row>> allRows(AsyncResultSet resultSet) {
80   - return ResultSetUtils.allRows(resultSet, readResultsProcessingExecutor);
81   - }
82   -
83 79 }
... ...
... ... @@ -39,7 +39,7 @@ import java.util.Map;
39 39 @Component
40 40 @Slf4j
41 41 @NoSqlAnyDao
42   -public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, AsyncResultSet> {
  42 +public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<CassandraStatementTask, TbResultSetFuture, TbResultSet> {
43 43
44 44 @Autowired
45 45 private EntityService entityService;
... ... @@ -107,19 +107,22 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
107 107 }
108 108
109 109 @Override
110   - protected SettableFuture<AsyncResultSet> create() {
  110 + protected SettableFuture<TbResultSet> create() {
111 111 return SettableFuture.create();
112 112 }
113 113
114 114 @Override
115   - protected TbResultSetFuture wrap(CassandraStatementTask task, SettableFuture<AsyncResultSet> future) {
  115 + protected TbResultSetFuture wrap(CassandraStatementTask task, SettableFuture<TbResultSet> future) {
116 116 return new TbResultSetFuture(future);
117 117 }
118 118
119 119 @Override
120   - protected ListenableFuture<AsyncResultSet> execute(AsyncTaskContext<CassandraStatementTask, AsyncResultSet> taskCtx) {
  120 + protected ListenableFuture<TbResultSet> execute(AsyncTaskContext<CassandraStatementTask, TbResultSet> taskCtx) {
121 121 CassandraStatementTask task = taskCtx.getTask();
122   - return task.getSession().executeAsync(task.getStatement());
  122 + return task.executeAsync(
  123 + statement ->
  124 + this.submit(new CassandraStatementTask(task.getTenantId(), task.getSession(), statement))
  125 + );
123 126 }
124 127
125 128 }
... ...
1   -/**
2   - * Copyright © 2016-2020 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.dao.nosql;
17   -
18   -import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
19   -import com.datastax.oss.driver.api.core.cql.Row;
20   -import com.google.common.collect.Lists;
21   -import com.google.common.util.concurrent.Futures;
22   -import com.google.common.util.concurrent.ListenableFuture;
23   -import com.google.common.util.concurrent.SettableFuture;
24   -
25   -import java.util.ArrayList;
26   -import java.util.List;
27   -import java.util.concurrent.CompletionStage;
28   -import java.util.concurrent.Executor;
29   -import java.util.stream.Collectors;
30   -
31   -public class ResultSetUtils {
32   -
33   - public static ListenableFuture<List<Row>> allRows(AsyncResultSet resultSet, Executor executor) {
34   - List<ListenableFuture<AsyncResultSet>> futures = new ArrayList<>();
35   - futures.add(Futures.immediateFuture(resultSet));
36   - while (resultSet.hasMorePages()) {
37   - futures.add(toListenable(resultSet.fetchNextPage()));
38   - }
39   - return Futures.transform( Futures.allAsList(futures),
40   - resultSets -> resultSets.stream()
41   - .map(rs -> loadRows(rs))
42   - .flatMap(rows -> rows.stream())
43   - .collect(Collectors.toList()),
44   - executor
45   - );
46   - }
47   -
48   - private static <T> ListenableFuture<T> toListenable(CompletionStage<T> completable) {
49   - SettableFuture<T> future = SettableFuture.create();
50   - completable.whenComplete(
51   - (r, ex) -> {
52   - if (ex != null) {
53   - future.setException(ex);
54   - } else {
55   - future.set(r);
56   - }
57   - }
58   - );
59   - return future;
60   - }
61   -
62   - private static List<Row> loadRows(AsyncResultSet resultSet) {
63   - return Lists.newArrayList(resultSet.currentPage());
64   - }
65   -}
... ... @@ -29,7 +29,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
29 29 import org.thingsboard.server.common.data.kv.LongDataEntry;
30 30 import org.thingsboard.server.common.data.kv.StringDataEntry;
31 31 import org.thingsboard.server.common.data.kv.TsKvEntry;
32   -import org.thingsboard.server.dao.nosql.ResultSetUtils;
  32 +import org.thingsboard.server.dao.nosql.TbResultSet;
33 33
34 34 import javax.annotation.Nullable;
35 35 import java.util.List;
... ... @@ -41,7 +41,7 @@ import java.util.stream.Collectors;
41 41 * Created by ashvayka on 20.02.17.
42 42 */
43 43 @Slf4j
44   -public class AggregatePartitionsFunction implements com.google.common.util.concurrent.AsyncFunction<List<AsyncResultSet>, Optional<TsKvEntry>> {
  44 +public class AggregatePartitionsFunction implements com.google.common.util.concurrent.AsyncFunction<List<TbResultSet>, Optional<TsKvEntry>> {
45 45
46 46 private static final int LONG_CNT_POS = 0;
47 47 private static final int DOUBLE_CNT_POS = 1;
... ... @@ -67,14 +67,14 @@ public class AggregatePartitionsFunction implements com.google.common.util.concu
67 67 }
68 68
69 69 @Override
70   - public ListenableFuture<Optional<TsKvEntry>> apply(@Nullable List<AsyncResultSet> rsList) {
  70 + public ListenableFuture<Optional<TsKvEntry>> apply(@Nullable List<TbResultSet> rsList) {
71 71 log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation);
72 72 if (rsList == null || rsList.isEmpty()) {
73 73 return Futures.immediateFuture(Optional.empty());
74 74 }
75 75 return Futures.transform(
76 76 Futures.allAsList(
77   - rsList.stream().map(rs -> ResultSetUtils.allRows(rs, this.executor))
  77 + rsList.stream().map(rs -> rs.allRows(this.executor))
78 78 .collect(Collectors.toList())),
79 79 rowsList -> {
80 80 try {
... ...
... ... @@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
52 52 import org.thingsboard.server.common.data.kv.TsKvEntry;
53 53 import org.thingsboard.server.dao.model.ModelConstants;
54 54 import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao;
  55 +import org.thingsboard.server.dao.nosql.TbResultSet;
55 56 import org.thingsboard.server.dao.nosql.TbResultSetFuture;
56 57 import org.thingsboard.server.dao.util.NoSqlTsDao;
57 58
... ... @@ -238,14 +239,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
238 239
239 240 BoundStatement stmt = stmtBuilder.build();
240 241
241   - Futures.addCallback(executeAsyncRead(tenantId, stmt), new FutureCallback<AsyncResultSet>() {
  242 + Futures.addCallback(executeAsyncRead(tenantId, stmt), new FutureCallback<TbResultSet>() {
242 243 @Override
243   - public void onSuccess(@Nullable AsyncResultSet result) {
  244 + public void onSuccess(@Nullable TbResultSet result) {
244 245 if (result == null) {
245 246 cursor.addData(convertResultToTsKvEntryList(Collections.emptyList()));
246 247 findAllAsyncSequentiallyWithLimit(tenantId, cursor, resultFuture);
247 248 } else {
248   - Futures.addCallback(allRows(result), new FutureCallback<List<Row>>() {
  249 + Futures.addCallback(result.allRows(readResultsProcessingExecutor), new FutureCallback<List<Row>>() {
249 250
250 251 @Override
251 252 public void onSuccess(@Nullable List<Row> result) {
... ... @@ -278,21 +279,21 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
278 279 final long endTs = query.getEndTs();
279 280 final long ts = startTs + (endTs - startTs) / 2;
280 281 ListenableFuture<List<Long>> partitionsListFuture = getPartitionsFuture(tenantId, query, entityId, minPartition, maxPartition);
281   - ListenableFuture<List<AsyncResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
  282 + ListenableFuture<List<TbResultSet>> aggregationChunks = Futures.transformAsync(partitionsListFuture,
282 283 getFetchChunksAsyncFunction(tenantId, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
283 284
284 285 return Futures.transformAsync(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts, readResultsProcessingExecutor), readResultsProcessingExecutor);
285 286 }
286 287
287   - private AsyncFunction<AsyncResultSet, List<Long>> getPartitionsArrayFunction() {
  288 + private AsyncFunction<TbResultSet, List<Long>> getPartitionsArrayFunction() {
288 289 return rs ->
289   - Futures.transform(allRows(rs), rows ->
  290 + Futures.transform(rs.allRows(readResultsProcessingExecutor), rows ->
290 291 rows.stream()
291 292 .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList()),
292 293 readResultsProcessingExecutor);
293 294 }
294 295
295   - private AsyncFunction<List<Long>, List<AsyncResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
  296 + private AsyncFunction<List<Long>, List<TbResultSet>> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) {
296 297 return partitions -> {
297 298 try {
298 299 PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER);
... ... @@ -684,8 +685,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
684 685 return deletePartitionStmt;
685 686 }
686 687
687   - private ListenableFuture<List<TsKvEntry>> convertAsyncResultSetToTsKvEntryList(AsyncResultSet rs) {
688   - return Futures.transform(this.allRows(rs),
  688 + private ListenableFuture<List<TsKvEntry>> convertAsyncResultSetToTsKvEntryList(TbResultSet rs) {
  689 + return Futures.transform(rs.allRows(readResultsProcessingExecutor),
689 690 rows -> this.convertResultToTsKvEntryList(rows), readResultsProcessingExecutor);
690 691 }
691 692
... ...
... ... @@ -8,7 +8,7 @@ cassandra.ssl=false
8 8
9 9 cassandra.jmx=false
10 10
11   -cassandra.metrics=true
  11 +cassandra.metrics=false
12 12
13 13 cassandra.compression=none
14 14
... ... @@ -60,4 +60,4 @@ cassandra.query.tenant_rate_limits.enabled=false
60 60 cassandra.query.tenant_rate_limits.configuration=5000:1,100000:60
61 61 cassandra.query.tenant_rate_limits.print_tenant_names=false
62 62
63   -service.type=monolith
\ No newline at end of file
  63 +service.type=monolith
... ...