Commit 87d05f7c845d2f64e6fe93607e3a946d9146497f

Authored by Andrew Shvayka
1 parent f6bc0791

Implementation

1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
... ... @@ -140,7 +140,7 @@ cassandra:
140 140 # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS
141 141 ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}"
142 142 # Specify max data points per request
143   - max_limit_per_request: "${TS_KV_MAX_LIMIT_PER_REQUEST:86400}"
  143 + min_aggregation_step_ms: "${TS_KV_MIN_AGGREGATION_STEP_MS:100}"
144 144
145 145 # Actor system parameters
146 146 actors:
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.common.data.kv;
2 17
3 18 /**
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -19,6 +19,7 @@ import java.util.UUID;
19 19
20 20 import com.datastax.driver.core.utils.UUIDs;
21 21 import org.apache.commons.lang3.ArrayUtils;
  22 +import org.thingsboard.server.common.data.kv.Aggregation;
22 23
23 24 public class ModelConstants {
24 25
... ... @@ -261,16 +262,17 @@ public class ModelConstants {
261 262 public static final String LONG_VALUE_COLUMN = "long_v";
262 263 public static final String DOUBLE_VALUE_COLUMN = "dbl_v";
263 264
  265 + public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN};
  266 +
264 267 public static final String[] COUNT_AGGREGATION_COLUMNS = new String[]{count(LONG_VALUE_COLUMN), count(DOUBLE_VALUE_COLUMN), count(BOOLEAN_VALUE_COLUMN), count(STRING_VALUE_COLUMN)};
265 268
266   - public static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN,};
267 269 public static final String[] MIN_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
268 270 new String[]{min(LONG_VALUE_COLUMN), min(DOUBLE_VALUE_COLUMN), min(BOOLEAN_VALUE_COLUMN), min(STRING_VALUE_COLUMN)});
269 271 public static final String[] MAX_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
270 272 new String[]{max(LONG_VALUE_COLUMN), max(DOUBLE_VALUE_COLUMN), max(BOOLEAN_VALUE_COLUMN), max(STRING_VALUE_COLUMN)});
271 273 public static final String[] SUM_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS,
272 274 new String[]{sum(LONG_VALUE_COLUMN), sum(DOUBLE_VALUE_COLUMN)});
273   - public static final String[] AVG_AGGREGATION_COLUMNS = ArrayUtils.addAll(COUNT_AGGREGATION_COLUMNS, SUM_AGGREGATION_COLUMNS);
  275 + public static final String[] AVG_AGGREGATION_COLUMNS = SUM_AGGREGATION_COLUMNS;
274 276
275 277 public static String min(String s) {
276 278 return "min(" + s + ")";
... ... @@ -287,4 +289,23 @@ public class ModelConstants {
287 289 public static String count(String s) {
288 290 return "count(" + s + ")";
289 291 }
  292 +
  293 + public static String[] getFetchColumnNames(Aggregation aggregation) {
  294 + switch (aggregation) {
  295 + case NONE:
  296 + return NONE_AGGREGATION_COLUMNS;
  297 + case MIN:
  298 + return MIN_AGGREGATION_COLUMNS;
  299 + case MAX:
  300 + return MAX_AGGREGATION_COLUMNS;
  301 + case SUM:
  302 + return SUM_AGGREGATION_COLUMNS;
  303 + case COUNT:
  304 + return COUNT_AGGREGATION_COLUMNS;
  305 + case AVG:
  306 + return AVG_AGGREGATION_COLUMNS;
  307 + default:
  308 + throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
  309 + }
  310 + }
290 311 }
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.dao.timeseries;
2 17
3 18 import com.datastax.driver.core.ResultSet;
... ... @@ -84,8 +99,11 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
84 99 count += curCount;
85 100 } else if (aggregation == Aggregation.AVG || aggregation == Aggregation.SUM) {
86 101 count += curCount;
87   - dValue = dValue == null ? curDValue : dValue + curDValue;
88   - lValue = lValue == null ? curLValue : lValue + curLValue;
  102 + if (curDValue != null) {
  103 + dValue = dValue == null ? curDValue : dValue + curDValue;
  104 + } else if (curLValue != null) {
  105 + lValue = lValue == null ? curLValue : lValue + curLValue;
  106 + }
89 107 } else if (aggregation == Aggregation.MIN) {
90 108 if (curDValue != null) {
91 109 dValue = dValue == null ? curDValue : Math.min(dValue, curDValue);
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
20 20 import com.datastax.driver.core.querybuilder.Select;
21 21 import com.google.common.base.Function;
22 22 import com.google.common.util.concurrent.AsyncFunction;
  23 +import com.google.common.util.concurrent.FutureCallback;
23 24 import com.google.common.util.concurrent.Futures;
24 25 import com.google.common.util.concurrent.ListenableFuture;
25 26 import lombok.extern.slf4j.Slf4j;
... ... @@ -51,14 +52,8 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
51 52 @Slf4j
52 53 public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
53 54
54   - @Value("${cassandra.query.max_limit_per_request}")
55   - protected Integer maxLimitPerRequest;
56   -
57   - @Value("${cassandra.query.read_result_processing_threads}")
58   - private int readResultsProcessingThreads;
59   -
60   - @Value("${cassandra.query.min_read_step}")
61   - private int minReadStep;
  55 + @Value("${cassandra.query.min_aggregation_step_ms}")
  56 + private int minAggregationStepMs;
62 57
63 58 @Value("${cassandra.query.ts_key_value_partitioning}")
64 59 private String partitioning;
... ... @@ -77,7 +72,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
77 72 @PostConstruct
78 73 public void init() {
79 74 getFetchStmt(Aggregation.NONE);
80   - readResultsProcessingExecutor = Executors.newFixedThreadPool(readResultsProcessingThreads);
  75 + readResultsProcessingExecutor = Executors.newCachedThreadPool();
81 76 Optional<TsPartitionDate> partition = TsPartitionDate.parse(partitioning);
82 77 if (partition.isPresent()) {
83 78 tsFormat = partition.get();
... ... @@ -100,33 +95,12 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
100 95 return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
101 96 }
102 97
103   -
104   - private static String[] getFetchColumnNames(Aggregation aggregation) {
105   - switch (aggregation) {
106   - case NONE:
107   - return ModelConstants.NONE_AGGREGATION_COLUMNS;
108   - case MIN:
109   - return ModelConstants.MIN_AGGREGATION_COLUMNS;
110   - case MAX:
111   - return ModelConstants.MAX_AGGREGATION_COLUMNS;
112   - case SUM:
113   - return ModelConstants.SUM_AGGREGATION_COLUMNS;
114   - case COUNT:
115   - return ModelConstants.COUNT_AGGREGATION_COLUMNS;
116   - case AVG:
117   - return ModelConstants.AVG_AGGREGATION_COLUMNS;
118   - default:
119   - throw new RuntimeException("Aggregation type: " + aggregation + " is not supported!");
120   - }
121   - }
122   -
123 98 @Override
124   - public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
  99 + public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
125 100 if (query.getAggregation() == Aggregation.NONE) {
126   - //TODO:
127   - return null;
  101 + return findAllAsyncWithLimit(entityType, entityId, query);
128 102 } else {
129   - long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minReadStep);
  103 + long step = Math.max((query.getEndTs() - query.getStartTs()) / query.getLimit(), minAggregationStepMs);
130 104 long stepTs = query.getStartTs();
131 105 List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
132 106 while (stepTs < query.getEndTs()) {
... ... @@ -143,23 +117,88 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
143 117 public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> input) {
144 118 return input.stream().filter(v -> v.isPresent()).map(v -> v.get()).collect(Collectors.toList());
145 119 }
146   - });
  120 + }, readResultsProcessingExecutor);
  121 + }
  122 + }
  123 +
  124 + private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(String entityType, UUID entityId, TsKvQuery query) {
  125 + long minPartition = query.getStartTs();
  126 + long maxPartition = query.getEndTs();
  127 +
  128 + ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
  129 +
  130 + final SimpleListenableFuture<List<TsKvEntry>> resultFuture = new SimpleListenableFuture<>();
  131 + final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
  132 +
  133 + Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
  134 + @Override
  135 + public void onSuccess(@Nullable List<Long> partitions) {
  136 + TsKvQueryCursor cursor = new TsKvQueryCursor(entityType, entityId, query, partitions);
  137 + findAllAsyncSequentiallyWithLimit(cursor, resultFuture);
  138 + }
  139 +
  140 + @Override
  141 + public void onFailure(Throwable t) {
  142 + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityType, entityId, minPartition, maxPartition, t);
  143 + }
  144 + }, readResultsProcessingExecutor);
  145 +
  146 + return resultFuture;
  147 + }
  148 +
  149 + private void findAllAsyncSequentiallyWithLimit(final TsKvQueryCursor cursor, final SimpleListenableFuture<List<TsKvEntry>> resultFuture) {
  150 + if (cursor.isFull() || !cursor.hasNextPartition()) {
  151 + resultFuture.set(cursor.getData());
  152 + } else {
  153 + PreparedStatement proto = getFetchStmt(Aggregation.NONE);
  154 + BoundStatement stmt = proto.bind();
  155 + stmt.setString(0, cursor.getEntityType());
  156 + stmt.setUUID(1, cursor.getEntityId());
  157 + stmt.setString(2, cursor.getKey());
  158 + stmt.setLong(3, cursor.getNextPartition());
  159 + stmt.setLong(4, cursor.getStartTs());
  160 + stmt.setLong(5, cursor.getEndTs());
  161 + stmt.setInt(6, cursor.getCurrentLimit());
  162 +
  163 + Futures.addCallback(executeAsyncRead(stmt), new FutureCallback<ResultSet>() {
  164 + @Override
  165 + public void onSuccess(@Nullable ResultSet result) {
  166 + cursor.addData(convertResultToTsKvEntryList(result.all()));
  167 + findAllAsyncSequentiallyWithLimit(cursor, resultFuture);
  168 + }
  169 +
  170 + @Override
  171 + public void onFailure(Throwable t) {
  172 + log.error("[{}][{}] Failed to fetch data for query {}-{}", stmt, t);
  173 + }
  174 + }, readResultsProcessingExecutor);
147 175 }
148 176 }
149 177
150 178 private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition) {
151 179 final Aggregation aggregation = query.getAggregation();
  180 + final String key = query.getKey();
152 181 final long startTs = query.getStartTs();
153 182 final long endTs = query.getEndTs();
154 183 final long ts = startTs + (endTs - startTs) / 2;
155 184
156   - ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, query.getKey(), minPartition, maxPartition);
157   - com.google.common.base.Function<ResultSet, List<Long>> toArrayFunction = rows -> rows.all().stream()
158   - .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
  185 + ResultSetFuture partitionsFuture = fetchPartitions(entityType, entityId, key, minPartition, maxPartition);
  186 +
  187 + ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
159 188
160   - ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, toArrayFunction, readResultsProcessingExecutor);
  189 + ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture,
  190 + getFetchChunksAsyncFunction(entityType, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor);
161 191
162   - AsyncFunction<List<Long>, List<ResultSet>> fetchChunksFunction = partitions -> {
  192 + return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor);
  193 + }
  194 +
  195 + private Function<ResultSet, List<Long>> getPartitionsArrayFunction() {
  196 + return rows -> rows.all().stream()
  197 + .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList());
  198 + }
  199 +
  200 + private AsyncFunction<List<Long>, List<ResultSet>> getFetchChunksAsyncFunction(String entityType, UUID entityId, String key, Aggregation aggregation, long startTs, long endTs) {
  201 + return partitions -> {
163 202 try {
164 203 PreparedStatement proto = getFetchStmt(aggregation);
165 204 List<ResultSetFuture> futures = new ArrayList<>(partitions.size());
... ... @@ -167,7 +206,7 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
167 206 BoundStatement stmt = proto.bind();
168 207 stmt.setString(0, entityType);
169 208 stmt.setUUID(1, entityId);
170   - stmt.setString(2, query.getKey());
  209 + stmt.setString(2, key);
171 210 stmt.setLong(3, partition);
172 211 stmt.setLong(4, startTs);
173 212 stmt.setLong(5, endTs);
... ... @@ -180,10 +219,6 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
180 219 throw e;
181 220 }
182 221 };
183   -
184   - ListenableFuture<List<ResultSet>> aggregationChunks = Futures.transform(partitionsListFuture, fetchChunksFunction, readResultsProcessingExecutor);
185   -
186   - return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, query.getKey(), ts), readResultsProcessingExecutor);
187 222 }
188 223
189 224 @Override
... ... @@ -320,14 +355,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
320 355 if (fetchStmts == null) {
321 356 fetchStmts = new PreparedStatement[Aggregation.values().length];
322 357 for (Aggregation type : Aggregation.values()) {
323   - fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
324   - String.join(", ", getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
325   - + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
326   - + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
327   - + "AND " + ModelConstants.KEY_COLUMN + " = ? "
328   - + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
329   - + "AND " + ModelConstants.TS_COLUMN + " > ? "
330   - + "AND " + ModelConstants.TS_COLUMN + " <= ?");
  358 + if (type == Aggregation.SUM && fetchStmts[Aggregation.AVG.ordinal()] != null) {
  359 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.AVG.ordinal()];
  360 + } else if (type == Aggregation.AVG && fetchStmts[Aggregation.SUM.ordinal()] != null) {
  361 + fetchStmts[type.ordinal()] = fetchStmts[Aggregation.SUM.ordinal()];
  362 + } else {
  363 + fetchStmts[type.ordinal()] = getSession().prepare("SELECT " +
  364 + String.join(", ", ModelConstants.getFetchColumnNames(type)) + " FROM " + ModelConstants.TS_KV_CF
  365 + + " WHERE " + ModelConstants.ENTITY_TYPE_COLUMN + " = ? "
  366 + + "AND " + ModelConstants.ENTITY_ID_COLUMN + " = ? "
  367 + + "AND " + ModelConstants.KEY_COLUMN + " = ? "
  368 + + "AND " + ModelConstants.PARTITION_COLUMN + " = ? "
  369 + + "AND " + ModelConstants.TS_COLUMN + " > ? "
  370 + + "AND " + ModelConstants.TS_COLUMN + " <= ?"
  371 + + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " DESC LIMIT ?" : ""));
  372 + }
331 373 }
332 374 }
333 375 return fetchStmts[aggType.ordinal()];
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -59,7 +59,7 @@ public class BaseTimeseriesService implements TimeseriesService {
59 59 public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
60 60 validate(entityType, entityId);
61 61 validate(query);
62   - return timeseriesDao.findAllAsync(entityType, entityId.getId(), query, timeseriesDao.toPartitionTs(query.getStartTs()), timeseriesDao.toPartitionTs(query.getEndTs()));
  62 + return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
63 63 }
64 64
65 65 @Override
... ... @@ -132,7 +132,8 @@ public class BaseTimeseriesService implements TimeseriesService {
132 132 throw new IncorrectParameterException("TsKvQuery can't be null");
133 133 } else if (isBlank(query.getKey())) {
134 134 throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
  135 + } else if (query.getAggregation() == null){
  136 + throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
135 137 }
136   - //TODO: add validation of all params
137 138 }
138 139 }
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import com.google.common.util.concurrent.AbstractFuture;
  19 +
  20 +/**
  21 + * Created by ashvayka on 21.02.17.
  22 + */
  23 +public class SimpleListenableFuture<V> extends AbstractFuture<V> {
  24 +
  25 + public SimpleListenableFuture() {
  26 +
  27 + }
  28 +
  29 + public boolean set(V value) {
  30 + return super.set(value);
  31 + }
  32 +
  33 +}
... ...
... ... @@ -33,7 +33,7 @@ public interface TimeseriesDao {
33 33
34 34 long toPartitionTs(long ts);
35 35
36   - ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query, long minPartition, long maxPartition);
  36 + ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
37 37
38 38 // List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
39 39
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.timeseries;
  17 +
  18 +import lombok.Data;
  19 +import lombok.Getter;
  20 +import org.thingsboard.server.common.data.kv.TsKvEntry;
  21 +import org.thingsboard.server.common.data.kv.TsKvQuery;
  22 +
  23 +import java.util.ArrayList;
  24 +import java.util.List;
  25 +import java.util.UUID;
  26 +
  27 +/**
  28 + * Created by ashvayka on 21.02.17.
  29 + */
  30 +public class TsKvQueryCursor {
  31 + @Getter
  32 + private final String entityType;
  33 + @Getter
  34 + private final UUID entityId;
  35 + @Getter
  36 + private final String key;
  37 + @Getter
  38 + private final long startTs;
  39 + @Getter
  40 + private final long endTs;
  41 + private final List<Long> partitions;
  42 + @Getter
  43 + private final List<TsKvEntry> data;
  44 +
  45 + private int partitionIndex;
  46 + private int currentLimit;
  47 +
  48 + public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List<Long> partitions) {
  49 + this.entityType = entityType;
  50 + this.entityId = entityId;
  51 + this.key = baseQuery.getKey();
  52 + this.startTs = baseQuery.getStartTs();
  53 + this.endTs = baseQuery.getEndTs();
  54 + this.partitions = partitions;
  55 + this.partitionIndex = partitions.size() - 1;
  56 + this.data = new ArrayList<>();
  57 + this.currentLimit = baseQuery.getLimit();
  58 + }
  59 +
  60 + public boolean hasNextPartition() {
  61 + return partitionIndex >= 0;
  62 + }
  63 +
  64 + public boolean isFull() {
  65 + return currentLimit <= 0;
  66 + }
  67 +
  68 + public long getNextPartition() {
  69 + long partition = partitions.get(partitionIndex);
  70 + partitionIndex--;
  71 + return partition;
  72 + }
  73 +
  74 + public int getCurrentLimit() {
  75 + return currentLimit;
  76 + }
  77 +
  78 + public void addData(List<TsKvEntry> newData) {
  79 + currentLimit -= newData.size();
  80 + data.addAll(newData);
  81 + }
  82 +}
... ...
... ... @@ -25,11 +25,11 @@ import java.util.Arrays;
25 25
26 26 @RunWith(ClasspathSuite.class)
27 27 @ClassnameFilters({
28   -// "org.thingsboard.server.dao.service.*Test",
29   -// "org.thingsboard.server.dao.kv.*Test",
30   -// "org.thingsboard.server.dao.plugin.*Test",
31   -// "org.thingsboard.server.dao.rule.*Test",
32   -// "org.thingsboard.server.dao.attributes.*Test",
  28 + "org.thingsboard.server.dao.service.*Test",
  29 + "org.thingsboard.server.dao.kv.*Test",
  30 + "org.thingsboard.server.dao.plugin.*Test",
  31 + "org.thingsboard.server.dao.rule.*Test",
  32 + "org.thingsboard.server.dao.attributes.*Test",
33 33 "org.thingsboard.server.dao.timeseries.*Test"
34 34 })
35 35 public class DaoTestSuite {
... ...
... ... @@ -51,8 +51,6 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
51 51 private static final String DOUBLE_KEY = "doubleKey";
52 52 private static final String BOOLEAN_KEY = "booleanKey";
53 53
54   - public static final int PARTITION_MINUTES = 1100;
55   -
56 54 private static final long TS = 42L;
57 55
58 56 KvEntry stringKvEntry = new StringDataEntry(STRING_KEY, "value");
... ... @@ -103,49 +101,101 @@ public class TimeseriesServiceTest extends AbstractServiceTest {
103 101 }
104 102
105 103 @Test
106   - public void testFindDeviceTsDataByQuery() throws Exception {
  104 + public void testFindDeviceTsData() throws Exception {
107 105 DeviceId deviceId = new DeviceId(UUIDs.timeBased());
108   - LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
109   - log.debug("Start event time is {}", localDateTime);
110   - List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
111   -
112   - for (int i = 0; i < PARTITION_MINUTES; i++) {
113   - long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
114   - BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
115   - tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
116   - entries.add(tsKvEntry);
117   - }
118   - log.debug("Saved all records {}", localDateTime);
119   - List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
120   - LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
121   - log.debug("Fetched records {}", localDateTime);
122   - List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
123   - assertEquals(expected.size(), list.size());
124   - assertEquals(expected, list);
125   - }
  106 + List<TsKvEntry> entries = new ArrayList<>();
  107 +
  108 + entries.add(save(deviceId, 5000, 100));
  109 + entries.add(save(deviceId, 15000, 200));
  110 +
  111 + entries.add(save(deviceId, 25000, 300));
  112 + entries.add(save(deviceId, 35000, 400));
  113 +
  114 + entries.add(save(deviceId, 45000, 500));
  115 + entries.add(save(deviceId, 55000, 600));
  116 +
  117 + List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  118 + 60000, 3, Aggregation.NONE)).get();
  119 + assertEquals(3, list.size());
  120 + assertEquals(55000, list.get(0).getTs());
  121 + assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
  122 +
  123 + assertEquals(45000, list.get(1).getTs());
  124 + assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue());
  125 +
  126 + assertEquals(35000, list.get(2).getTs());
  127 + assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
  128 +
  129 + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  130 + 60000, 3, Aggregation.AVG)).get();
  131 + assertEquals(3, list.size());
  132 + assertEquals(10000, list.get(0).getTs());
  133 + assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
  134 +
  135 + assertEquals(30000, list.get(1).getTs());
  136 + assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue());
  137 +
  138 + assertEquals(50000, list.get(2).getTs());
  139 + assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
  140 +
  141 + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  142 + 60000, 3, Aggregation.SUM)).get();
  143 +
  144 + assertEquals(3, list.size());
  145 + assertEquals(10000, list.get(0).getTs());
  146 + assertEquals(java.util.Optional.of(300L), list.get(0).getLongValue());
  147 +
  148 + assertEquals(30000, list.get(1).getTs());
  149 + assertEquals(java.util.Optional.of(700L), list.get(1).getLongValue());
126 150
127   -// @Test
128   -// public void testFindDeviceTsDataByQuery() throws Exception {
129   -// DeviceId deviceId = new DeviceId(UUIDs.timeBased());
130   -// LocalDateTime localDateTime = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(PARTITION_MINUTES);
131   -// log.debug("Start event time is {}", localDateTime);
132   -// List<TsKvEntry> entries = new ArrayList<>(PARTITION_MINUTES);
133   -//
134   -// for (int i = 0; i < PARTITION_MINUTES; i++) {
135   -// long time = localDateTime.plusMinutes(i).toInstant(ZoneOffset.UTC).toEpochMilli();
136   -// BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(time, stringKvEntry);
137   -// tsService.save(DataConstants.DEVICE, deviceId, tsKvEntry).get();
138   -// entries.add(tsKvEntry);
139   -// }
140   -// log.debug("Saved all records {}", localDateTime);
141   -// List<TsKvEntry> list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(STRING_KEY, entries.get(599).getTs(),
142   -// LocalDateTime.now(ZoneOffset.UTC).toInstant(ZoneOffset.UTC).toEpochMilli(), PARTITION_MINUTES - 599, Aggregation.MIN)).get();
143   -// log.debug("Fetched records {}", localDateTime);
144   -// List<TsKvEntry> expected = entries.subList(600, PARTITION_MINUTES);
145   -// assertEquals(expected.size(), list.size());
146   -// assertEquals(expected, list);
147   -// }
  151 + assertEquals(50000, list.get(2).getTs());
  152 + assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
148 153
  154 + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  155 + 60000, 3, Aggregation.MIN)).get();
  156 +
  157 + assertEquals(3, list.size());
  158 + assertEquals(10000, list.get(0).getTs());
  159 + assertEquals(java.util.Optional.of(100L), list.get(0).getLongValue());
  160 +
  161 + assertEquals(30000, list.get(1).getTs());
  162 + assertEquals(java.util.Optional.of(300L), list.get(1).getLongValue());
  163 +
  164 + assertEquals(50000, list.get(2).getTs());
  165 + assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
  166 +
  167 + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  168 + 60000, 3, Aggregation.MAX)).get();
  169 +
  170 + assertEquals(3, list.size());
  171 + assertEquals(10000, list.get(0).getTs());
  172 + assertEquals(java.util.Optional.of(200L), list.get(0).getLongValue());
  173 +
  174 + assertEquals(30000, list.get(1).getTs());
  175 + assertEquals(java.util.Optional.of(400L), list.get(1).getLongValue());
  176 +
  177 + assertEquals(50000, list.get(2).getTs());
  178 + assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
  179 +
  180 + list = tsService.findAll(DataConstants.DEVICE, deviceId, new BaseTsKvQuery(LONG_KEY, 0,
  181 + 60000, 3, Aggregation.COUNT)).get();
  182 +
  183 + assertEquals(3, list.size());
  184 + assertEquals(10000, list.get(0).getTs());
  185 + assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
  186 +
  187 + assertEquals(30000, list.get(1).getTs());
  188 + assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
  189 +
  190 + assertEquals(50000, list.get(2).getTs());
  191 + assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
  192 + }
  193 +
  194 + private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
  195 + TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
  196 + tsService.save(DataConstants.DEVICE, deviceId, entry).get();
  197 + return entry;
  198 + }
149 199
150 200 private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
151 201 tsService.save(DataConstants.DEVICE, deviceId, toTsEntry(ts, stringKvEntry)).get();
... ...
... ... @@ -48,6 +48,4 @@ cassandra.query.ts_key_value_partitioning=HOURS
48 48
49 49 cassandra.query.max_limit_per_request=1000
50 50
51   -cassandra.query.read_result_processing_threads=3
52   -
53   -cassandra.query.min_read_step=100
\ No newline at end of file
  51 +cassandra.query.min_aggregation_step_ms=100
\ No newline at end of file
... ...