Commit 7358b4a79b540c7533c75792df01280bc1fe28c7

Authored by Andrew Shvayka
1 parent 66c0e717

Implemented MIN/MAX/AVG/SUM/COUNT calculation for mixed number data types

... ... @@ -52,22 +52,33 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
52 52 public TsKvEntity() {
53 53 }
54 54
55   - public TsKvEntity(Double avgLongValue, Double avgDoubleValue) {
56   - if(avgLongValue != null) {
57   - this.longValue = avgLongValue.longValue();
  55 + public TsKvEntity(Long longSumValue, Double doubleSumValue, Long longCountValue, Long doubleCountValue) {
  56 + double sum = 0.0;
  57 + if (longSumValue != null) {
  58 + sum += longSumValue;
58 59 }
59   - this.doubleValue = avgDoubleValue;
  60 + if (doubleSumValue != null) {
  61 + sum += doubleSumValue;
  62 + }
  63 + this.doubleValue = sum / (longCountValue + doubleCountValue);
60 64 }
61 65
62 66 public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
63   - this.longValue = sumLongValue;
64   - this.doubleValue = sumDoubleValue;
  67 + if (sumDoubleValue != null) {
  68 + this.doubleValue = sumDoubleValue + (sumLongValue != null ? sumLongValue.doubleValue() : 0.0);
  69 + } else {
  70 + this.longValue = sumLongValue;
  71 + }
65 72 }
66 73
67   - public TsKvEntity(String strValue, Long longValue, Double doubleValue) {
  74 + public TsKvEntity(String strValue, Long longValue, Double doubleValue, boolean max) {
68 75 this.strValue = strValue;
69   - this.longValue = longValue;
70   - this.doubleValue = doubleValue;
  76 + if (longValue != null && doubleValue != null) {
  77 + this.doubleValue = max ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
  78 + } else {
  79 + this.longValue = longValue;
  80 + this.doubleValue = doubleValue;
  81 + }
71 82 }
72 83
73 84 public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) {
... ... @@ -75,10 +86,8 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
75 86 this.longValue = booleanValueCount;
76 87 } else if (strValueCount != 0) {
77 88 this.longValue = strValueCount;
78   - } else if (longValueCount != 0) {
79   - this.longValue = longValueCount;
80   - } else if (doubleValueCount != 0) {
81   - this.longValue = doubleValueCount;
  89 + } else {
  90 + this.longValue = longValueCount + doubleValueCount;
82 91 }
83 92 }
84 93
... ...
... ... @@ -55,7 +55,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
55 55 @Param("endTs") long endTs);
56 56
57 57 @Async
58   - @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
  58 + @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue), true) FROM TsKvEntity tskv " +
59 59 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
60 60 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
61 61 CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
... ... @@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
65 65 @Param("endTs") long endTs);
66 66
67 67 @Async
68   - @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
  68 + @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue), false) FROM TsKvEntity tskv " +
69 69 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
70 70 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
71 71 CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
... ... @@ -85,7 +85,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
85 85 @Param("endTs") long endTs);
86 86
87 87 @Async
88   - @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
  88 + @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
89 89 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
90 90 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
91 91 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
... ...
... ... @@ -98,6 +98,7 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
98 98 curLValue = getLongValue(row);
99 99 }
100 100 if (doubleCount > 0) {
  101 + aggResult.hasDouble = true;
101 102 aggResult.dataType = DataType.DOUBLE;
102 103 curCount += doubleCount;
103 104 curDValue = getDoubleValue(row);
... ... @@ -222,17 +223,25 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
222 223 if (aggResult.count == 0 || (aggResult.dataType == DataType.DOUBLE && aggResult.dValue == null) || (aggResult.dataType == DataType.LONG && aggResult.lValue == null)) {
223 224 return Optional.empty();
224 225 } else if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
225   - double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
226   - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
  226 + if(aggregation == Aggregation.AVG || aggResult.hasDouble) {
  227 + double sum = Optional.ofNullable(aggResult.dValue).orElse(0.0d) + Optional.ofNullable(aggResult.lValue).orElse(0L);
  228 + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.SUM ? sum : (sum / aggResult.count))));
  229 + } else {
  230 + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggregation == Aggregation.SUM ? aggResult.lValue : (aggResult.lValue / aggResult.count))));
  231 + }
227 232 }
228 233 return Optional.empty();
229 234 }
230 235
231 236 private Optional<TsKvEntry> processMinOrMaxResult(AggregationResult aggResult) {
232 237 if (aggResult.dataType == DataType.DOUBLE || aggResult.dataType == DataType.LONG) {
233   - double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
234   - double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
235   - return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
  238 + if(aggResult.hasDouble) {
  239 + double currentD = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.dValue).orElse(Double.MAX_VALUE) : Optional.ofNullable(aggResult.dValue).orElse(Double.MIN_VALUE);
  240 + double currentL = aggregation == Aggregation.MIN ? Optional.ofNullable(aggResult.lValue).orElse(Long.MAX_VALUE) : Optional.ofNullable(aggResult.lValue).orElse(Long.MIN_VALUE);
  241 + return Optional.of(new BasicTsKvEntry(ts, new DoubleDataEntry(key, aggregation == Aggregation.MIN ? Math.min(currentD, currentL) : Math.max(currentD, currentL))));
  242 + } else {
  243 + return Optional.of(new BasicTsKvEntry(ts, new LongDataEntry(key, aggResult.lValue)));
  244 + }
236 245 } else if (aggResult.dataType == DataType.STRING) {
237 246 return Optional.of(new BasicTsKvEntry(ts, new StringDataEntry(key, aggResult.sValue)));
238 247 } else {
... ... @@ -247,5 +256,6 @@ public class AggregatePartitionsFunction implements com.google.common.base.Funct
247 256 Double dValue = null;
248 257 Long lValue = null;
249 258 long count = 0;
  259 + boolean hasDouble = false;
250 260 }
251 261 }
... ...
... ... @@ -25,9 +25,7 @@ import java.util.Arrays;
25 25
26 26 @RunWith(ClasspathSuite.class)
27 27 @ClassnameFilters({
28   - "org.thingsboard.server.dao.service.*ServiceNoSqlTest",
29   - "org.thingsboard.server.dao.service.queue.cassandra.*.*.*Test",
30   - "org.thingsboard.server.dao.service.queue.cassandra.*Test"
  28 + "org.thingsboard.server.dao.service.*ServiceNoSqlTest"
31 29 })
32 30 public class NoSqlDaoServiceTestSuite {
33 31
... ...
... ... @@ -221,13 +221,13 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
221 221 60000, 20000, 3, Aggregation.AVG))).get();
222 222 assertEquals(3, list.size());
223 223 assertEquals(10000, list.get(0).getTs());
224   - assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
  224 + assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue());
225 225
226 226 assertEquals(30000, list.get(1).getTs());
227   - assertEquals(java.util.Optional.of(350L), list.get(1).getLongValue());
  227 + assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue());
228 228
229 229 assertEquals(50000, list.get(2).getTs());
230   - assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
  230 + assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue());
231 231
232 232 list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
233 233 60000, 20000, 3, Aggregation.SUM))).get();
... ... @@ -282,12 +282,110 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
282 282 assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
283 283 }
284 284
  285 + @Test
  286 + public void testFindDeviceLongAndDoubleTsData() throws Exception {
  287 + DeviceId deviceId = new DeviceId(UUIDs.timeBased());
  288 + List<TsKvEntry> entries = new ArrayList<>();
  289 +
  290 + entries.add(save(deviceId, 5000, 100));
  291 + entries.add(save(deviceId, 15000, 200.0));
  292 +
  293 + entries.add(save(deviceId, 25000, 300));
  294 + entries.add(save(deviceId, 35000, 400.0));
  295 +
  296 + entries.add(save(deviceId, 45000, 500));
  297 + entries.add(save(deviceId, 55000, 600.0));
  298 +
  299 + List<TsKvEntry> list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  300 + 60000, 20000, 3, Aggregation.NONE))).get();
  301 + assertEquals(3, list.size());
  302 + assertEquals(55000, list.get(0).getTs());
  303 + assertEquals(java.util.Optional.of(600.0), list.get(0).getDoubleValue());
  304 +
  305 + assertEquals(45000, list.get(1).getTs());
  306 + assertEquals(java.util.Optional.of(500L), list.get(1).getLongValue());
  307 +
  308 + assertEquals(35000, list.get(2).getTs());
  309 + assertEquals(java.util.Optional.of(400.0), list.get(2).getDoubleValue());
  310 +
  311 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  312 + 60000, 20000, 3, Aggregation.AVG))).get();
  313 + assertEquals(3, list.size());
  314 + assertEquals(10000, list.get(0).getTs());
  315 + assertEquals(java.util.Optional.of(150.0), list.get(0).getDoubleValue());
  316 +
  317 + assertEquals(30000, list.get(1).getTs());
  318 + assertEquals(java.util.Optional.of(350.0), list.get(1).getDoubleValue());
  319 +
  320 + assertEquals(50000, list.get(2).getTs());
  321 + assertEquals(java.util.Optional.of(550.0), list.get(2).getDoubleValue());
  322 +
  323 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  324 + 60000, 20000, 3, Aggregation.SUM))).get();
  325 +
  326 + assertEquals(3, list.size());
  327 + assertEquals(10000, list.get(0).getTs());
  328 + assertEquals(java.util.Optional.of(300.0), list.get(0).getDoubleValue());
  329 +
  330 + assertEquals(30000, list.get(1).getTs());
  331 + assertEquals(java.util.Optional.of(700.0), list.get(1).getDoubleValue());
  332 +
  333 + assertEquals(50000, list.get(2).getTs());
  334 + assertEquals(java.util.Optional.of(1100.0), list.get(2).getDoubleValue());
  335 +
  336 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  337 + 60000, 20000, 3, Aggregation.MIN))).get();
  338 +
  339 + assertEquals(3, list.size());
  340 + assertEquals(10000, list.get(0).getTs());
  341 + assertEquals(java.util.Optional.of(100.0), list.get(0).getDoubleValue());
  342 +
  343 + assertEquals(30000, list.get(1).getTs());
  344 + assertEquals(java.util.Optional.of(300.0), list.get(1).getDoubleValue());
  345 +
  346 + assertEquals(50000, list.get(2).getTs());
  347 + assertEquals(java.util.Optional.of(500.0), list.get(2).getDoubleValue());
  348 +
  349 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  350 + 60000, 20000, 3, Aggregation.MAX))).get();
  351 +
  352 + assertEquals(3, list.size());
  353 + assertEquals(10000, list.get(0).getTs());
  354 + assertEquals(java.util.Optional.of(200.0), list.get(0).getDoubleValue());
  355 +
  356 + assertEquals(30000, list.get(1).getTs());
  357 + assertEquals(java.util.Optional.of(400.0), list.get(1).getDoubleValue());
  358 +
  359 + assertEquals(50000, list.get(2).getTs());
  360 + assertEquals(java.util.Optional.of(600.0), list.get(2).getDoubleValue());
  361 +
  362 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0,
  363 + 60000, 20000, 3, Aggregation.COUNT))).get();
  364 +
  365 + assertEquals(3, list.size());
  366 + assertEquals(10000, list.get(0).getTs());
  367 + assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
  368 +
  369 + assertEquals(30000, list.get(1).getTs());
  370 + assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
  371 +
  372 + assertEquals(50000, list.get(2).getTs());
  373 + assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
  374 + }
  375 +
285 376 private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
286 377 TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
287 378 tsService.save(tenantId, deviceId, entry).get();
288 379 return entry;
289 380 }
290 381
  382 + private TsKvEntry save(DeviceId deviceId, long ts, double value) throws Exception {
  383 + TsKvEntry entry = new BasicTsKvEntry(ts, new DoubleDataEntry(LONG_KEY, value));
  384 + tsService.save(tenantId, deviceId, entry).get();
  385 + return entry;
  386 + }
  387 +
  388 +
291 389 private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
292 390 tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get();
293 391 tsService.save(tenantId, deviceId, toTsEntry(ts, longKvEntry)).get();
... ...