Commit bb4a51bae10ba40c735b42dc2797c8ab6cad4d34

Authored by Andrii Shvaika
1 parent 3c78d614

Fix data aggregation

@@ -33,7 +33,7 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt @@ -33,7 +33,7 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt
33 33
34 @Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " + 34 @Query("SELECT tskv FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " +
35 "AND tskv.key = :entityKey " + 35 "AND tskv.key = :entityKey " +
36 - "AND tskv.ts > :startTs AND tskv.ts <= :endTs") 36 + "AND tskv.ts >= :startTs AND tskv.ts < :endTs")
37 List<TimescaleTsKvEntity> findAllWithLimit( 37 List<TimescaleTsKvEntity> findAllWithLimit(
38 @Param("entityId") UUID entityId, 38 @Param("entityId") UUID entityId,
39 @Param("entityKey") int key, 39 @Param("entityKey") int key,
@@ -44,7 +44,7 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt @@ -44,7 +44,7 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt
44 @Modifying 44 @Modifying
45 @Query("DELETE FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " + 45 @Query("DELETE FROM TimescaleTsKvEntity tskv WHERE tskv.entityId = :entityId " +
46 "AND tskv.key = :entityKey " + 46 "AND tskv.key = :entityKey " +
47 - "AND tskv.ts > :startTs AND tskv.ts <= :endTs") 47 + "AND tskv.ts >= :startTs AND tskv.ts < :endTs")
48 void delete(@Param("entityId") UUID entityId, 48 void delete(@Param("entityId") UUID entityId,
49 @Param("entityKey") int key, 49 @Param("entityKey") int key,
50 @Param("startTs") long startTs, 50 @Param("startTs") long startTs,
@@ -32,7 +32,7 @@ import java.util.concurrent.CompletableFuture; @@ -32,7 +32,7 @@ import java.util.concurrent.CompletableFuture;
32 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> { 32 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
33 33
34 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + 34 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
35 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 35 + "AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
36 List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId, 36 List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
37 @Param("entityKey") int key, 37 @Param("entityKey") int key,
38 @Param("startTs") long startTs, 38 @Param("startTs") long startTs,
@@ -42,7 +42,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -42,7 +42,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
42 @Transactional 42 @Transactional
43 @Modifying 43 @Modifying
44 @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + 44 @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
45 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 45 + "AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
46 void delete(@Param("entityId") UUID entityId, 46 void delete(@Param("entityId") UUID entityId,
47 @Param("entityKey") int key, 47 @Param("entityKey") int key,
48 @Param("startTs") long startTs, 48 @Param("startTs") long startTs,
@@ -51,7 +51,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -51,7 +51,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
51 @Async 51 @Async
52 @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " + 52 @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +
53 "WHERE tskv.strValue IS NOT NULL " + 53 "WHERE tskv.strValue IS NOT NULL " +
54 - "AND tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 54 + "AND tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
55 CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") UUID entityId, 55 CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") UUID entityId,
56 @Param("entityKey") int entityKey, 56 @Param("entityKey") int entityKey,
57 @Param("startTs") long startTs, 57 @Param("startTs") long startTs,
@@ -63,7 +63,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -63,7 +63,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
63 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + 63 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
64 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + 64 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
65 "'MAX') FROM TsKvEntity tskv " + 65 "'MAX') FROM TsKvEntity tskv " +
66 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 66 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
67 CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") UUID entityId, 67 CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") UUID entityId,
68 @Param("entityKey") int entityKey, 68 @Param("entityKey") int entityKey,
69 @Param("startTs") long startTs, 69 @Param("startTs") long startTs,
@@ -73,7 +73,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -73,7 +73,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
73 @Async 73 @Async
74 @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " + 74 @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +
75 "WHERE tskv.strValue IS NOT NULL " + 75 "WHERE tskv.strValue IS NOT NULL " +
76 - "AND tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 76 + "AND tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
77 CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") UUID entityId, 77 CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") UUID entityId,
78 @Param("entityKey") int entityKey, 78 @Param("entityKey") int entityKey,
79 @Param("startTs") long startTs, 79 @Param("startTs") long startTs,
@@ -85,7 +85,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -85,7 +85,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
85 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + 85 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
86 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + 86 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
87 "'MIN') FROM TsKvEntity tskv " + 87 "'MIN') FROM TsKvEntity tskv " +
88 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 88 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
89 CompletableFuture<TsKvEntity> findNumericMin( 89 CompletableFuture<TsKvEntity> findNumericMin(
90 @Param("entityId") UUID entityId, 90 @Param("entityId") UUID entityId,
91 @Param("entityKey") int entityKey, 91 @Param("entityKey") int entityKey,
@@ -98,7 +98,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -98,7 +98,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
98 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + 98 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
99 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + 99 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
100 "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " + 100 "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +
101 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 101 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
102 CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId, 102 CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId,
103 @Param("entityKey") int entityKey, 103 @Param("entityKey") int entityKey,
104 @Param("startTs") long startTs, 104 @Param("startTs") long startTs,
@@ -110,7 +110,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -110,7 +110,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
110 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + 110 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
111 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + 111 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
112 "'AVG') FROM TsKvEntity tskv " + 112 "'AVG') FROM TsKvEntity tskv " +
113 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 113 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
114 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") UUID entityId, 114 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") UUID entityId,
115 @Param("entityKey") int entityKey, 115 @Param("entityKey") int entityKey,
116 @Param("startTs") long startTs, 116 @Param("startTs") long startTs,
@@ -122,7 +122,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite @@ -122,7 +122,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
122 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " + 122 "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
123 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " + 123 "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
124 "'SUM') FROM TsKvEntity tskv " + 124 "'SUM') FROM TsKvEntity tskv " +
125 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 125 + "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts >= :startTs AND tskv.ts < :endTs")
126 CompletableFuture<TsKvEntity> findSum(@Param("entityId") UUID entityId, 126 CompletableFuture<TsKvEntity> findSum(@Param("entityId") UUID entityId,
127 @Param("entityKey") int entityKey, 127 @Param("entityKey") int entityKey,
128 @Param("startTs") long startTs, 128 @Param("startTs") long startTs,
@@ -550,8 +550,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -550,8 +550,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
550 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM 550 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
551 + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM 551 + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
552 + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM 552 + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
553 - + "AND " + ModelConstants.TS_COLUMN + " > ? "  
554 - + "AND " + ModelConstants.TS_COLUMN + " <= ?"); 553 + + "AND " + ModelConstants.TS_COLUMN + " >= ? "
  554 + + "AND " + ModelConstants.TS_COLUMN + " < ?");
555 } 555 }
556 return deleteStmt; 556 return deleteStmt;
557 } 557 }
@@ -740,8 +740,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD @@ -740,8 +740,8 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
740 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM 740 + "AND " + ModelConstants.ENTITY_ID_COLUMN + EQUALS_PARAM
741 + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM 741 + "AND " + ModelConstants.KEY_COLUMN + EQUALS_PARAM
742 + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM 742 + "AND " + ModelConstants.PARTITION_COLUMN + EQUALS_PARAM
743 - + "AND " + ModelConstants.TS_COLUMN + " > ? "  
744 - + "AND " + ModelConstants.TS_COLUMN + " <= ?" 743 + + "AND " + ModelConstants.TS_COLUMN + " >= ? "
  744 + + "AND " + ModelConstants.TS_COLUMN + " < ?"
745 + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : "")); 745 + (type == Aggregation.NONE ? " ORDER BY " + ModelConstants.TS_COLUMN + " " + orderBy + " LIMIT ?" : ""));
746 } 746 }
747 } 747 }
@@ -300,14 +300,14 @@ export class DataAggregator { @@ -300,14 +300,14 @@ export class DataAggregator {
300 const aggKeyData = this.aggregationMap.aggMap[key]; 300 const aggKeyData = this.aggregationMap.aggMap[key];
301 let keyData = this.dataBuffer[key]; 301 let keyData = this.dataBuffer[key];
302 aggKeyData.forEach((aggData, aggTimestamp) => { 302 aggKeyData.forEach((aggData, aggTimestamp) => {
303 - if (aggTimestamp <= this.startTs) { 303 + if (aggTimestamp < this.startTs) {
304 if (this.subsTw.aggregation.stateData && 304 if (this.subsTw.aggregation.stateData &&
305 (!this.lastPrevKvPairData[key] || this.lastPrevKvPairData[key][0] < aggTimestamp)) { 305 (!this.lastPrevKvPairData[key] || this.lastPrevKvPairData[key][0] < aggTimestamp)) {
306 this.lastPrevKvPairData[key] = [aggTimestamp, aggData.aggValue]; 306 this.lastPrevKvPairData[key] = [aggTimestamp, aggData.aggValue];
307 } 307 }
308 aggKeyData.delete(aggTimestamp); 308 aggKeyData.delete(aggTimestamp);
309 this.updatedData = true; 309 this.updatedData = true;
310 - } else if (aggTimestamp <= this.endTs) { 310 + } else if (aggTimestamp < this.endTs) {
311 const kvPair: [number, any] = [aggTimestamp, aggData.aggValue]; 311 const kvPair: [number, any] = [aggTimestamp, aggData.aggValue];
312 keyData.push(kvPair); 312 keyData.push(kvPair);
313 } 313 }