Commit 90c3a2b533797af7e523acb54c3c2acc59c05c9c

Authored by Igor Kulikov
1 parent 7358b4a7

Improve Jpa Timeseries DAO. Fixed SQL Warning Code: -1003. Issue #925, Issue #397.

... ... @@ -49,35 +49,52 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
49 49 @IdClass(TsKvCompositeKey.class)
50 50 public final class TsKvEntity implements ToData<TsKvEntry> {
51 51
52   - public TsKvEntity() {
53   - }
  52 + private static final String SUM = "SUM";
  53 + private static final String AVG = "AVG";
  54 + private static final String MIN = "MIN";
  55 + private static final String MAX = "MAX";
54 56
55   - public TsKvEntity(Long longSumValue, Double doubleSumValue, Long longCountValue, Long doubleCountValue) {
56   - double sum = 0.0;
57   - if (longSumValue != null) {
58   - sum += longSumValue;
59   - }
60   - if (doubleSumValue != null) {
61   - sum += doubleSumValue;
62   - }
63   - this.doubleValue = sum / (longCountValue + doubleCountValue);
  57 + public TsKvEntity() {
64 58 }
65 59
66   - public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
67   - if (sumDoubleValue != null) {
68   - this.doubleValue = sumDoubleValue + (sumLongValue != null ? sumLongValue.doubleValue() : 0.0);
69   - } else {
70   - this.longValue = sumLongValue;
71   - }
  60 + public TsKvEntity(String strValue) {
  61 + this.strValue = strValue;
72 62 }
73 63
74   - public TsKvEntity(String strValue, Long longValue, Double doubleValue, boolean max) {
75   - this.strValue = strValue;
76   - if (longValue != null && doubleValue != null) {
77   - this.doubleValue = max ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
78   - } else {
79   - this.longValue = longValue;
80   - this.doubleValue = doubleValue;
  64 + public TsKvEntity(Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String aggType) {
  65 + switch (aggType) {
  66 + case AVG:
  67 + double sum = 0.0;
  68 + if (longValue != null) {
  69 + sum += longValue;
  70 + }
  71 + if (doubleValue != null) {
  72 + sum += doubleValue;
  73 + }
  74 + long totalCount = longCountValue + doubleCountValue;
  75 + if (totalCount > 0) {
  76 + this.doubleValue = sum / (longCountValue + doubleCountValue);
  77 + } else {
  78 + this.doubleValue = 0.0;
  79 + }
  80 + break;
  81 + case SUM:
  82 + if (doubleCountValue > 0) {
  83 + this.doubleValue = doubleValue + (longValue != null ? longValue.doubleValue() : 0.0);
  84 + } else {
  85 + this.longValue = longValue;
  86 + }
  87 + break;
  88 + case MIN:
  89 + case MAX:
  90 + if (longCountValue > 0 && doubleCountValue > 0) {
  91 + this.doubleValue = MAX.equals(aggType) ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());
  92 + } else if (doubleCountValue > 0) {
  93 + this.doubleValue = doubleValue;
  94 + } else if (longCountValue > 0) {
  95 + this.longValue = longValue;
  96 + }
  97 + break;
81 98 }
82 99 }
83 100
... ...
... ... @@ -161,52 +161,62 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
161 161 }
162 162
163 163 private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
164   - CompletableFuture<TsKvEntity> entity;
  164 + List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
165 165 String entityIdStr = fromTimeUUID(entityId.getId());
166 166 switch (aggregation) {
167 167 case AVG:
168   - entity = tsKvRepository.findAvg(
  168 + entitiesFutures.add(tsKvRepository.findAvg(
169 169 entityIdStr,
170 170 entityId.getEntityType(),
171 171 key,
172 172 startTs,
173   - endTs);
  173 + endTs));
174 174
175 175 break;
176 176 case MAX:
177   - entity = tsKvRepository.findMax(
  177 + entitiesFutures.add(tsKvRepository.findStringMax(
178 178 entityIdStr,
179 179 entityId.getEntityType(),
180 180 key,
181 181 startTs,
182   - endTs);
  182 + endTs));
  183 + entitiesFutures.add(tsKvRepository.findNumericMax(
  184 + entityIdStr,
  185 + entityId.getEntityType(),
  186 + key,
  187 + startTs,
  188 + endTs));
183 189
184 190 break;
185 191 case MIN:
186   - entity = tsKvRepository.findMin(
  192 + entitiesFutures.add(tsKvRepository.findStringMin(
187 193 entityIdStr,
188 194 entityId.getEntityType(),
189 195 key,
190 196 startTs,
191   - endTs);
192   -
  197 + endTs));
  198 + entitiesFutures.add(tsKvRepository.findNumericMin(
  199 + entityIdStr,
  200 + entityId.getEntityType(),
  201 + key,
  202 + startTs,
  203 + endTs));
193 204 break;
194 205 case SUM:
195   - entity = tsKvRepository.findSum(
  206 + entitiesFutures.add(tsKvRepository.findSum(
196 207 entityIdStr,
197 208 entityId.getEntityType(),
198 209 key,
199 210 startTs,
200   - endTs);
201   -
  211 + endTs));
202 212 break;
203 213 case COUNT:
204   - entity = tsKvRepository.findCount(
  214 + entitiesFutures.add(tsKvRepository.findCount(
205 215 entityIdStr,
206 216 entityId.getEntityType(),
207 217 key,
208 218 startTs,
209   - endTs);
  219 + endTs));
210 220
211 221 break;
212 222 default:
... ... @@ -214,11 +224,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
214 224 }
215 225
216 226 SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
217   - entity.whenComplete((tsKvEntity, throwable) -> {
  227 +
  228 +
  229 + CompletableFuture<List<TsKvEntity>> entities =
  230 + CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()]))
  231 + .thenApply(v -> entitiesFutures.stream()
  232 + .map(CompletableFuture::join)
  233 + .collect(Collectors.toList()));
  234 +
  235 +
  236 + entities.whenComplete((tsKvEntities, throwable) -> {
218 237 if (throwable != null) {
219 238 listenableFuture.setException(throwable);
220 239 } else {
221   - listenableFuture.set(tsKvEntity);
  240 + TsKvEntity result = null;
  241 + for (TsKvEntity entity : tsKvEntities) {
  242 + if (entity.isNotEmpty()) {
  243 + result = entity;
  244 + break;
  245 + }
  246 + }
  247 + listenableFuture.set(result);
222 248 }
223 249 });
224 250 return Futures.transform(listenableFuture, new Function<TsKvEntity, Optional<TsKvEntry>>() {
... ...
... ... @@ -35,7 +35,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
35 35
36 36 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
37 37 "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
38   - "AND tskv.ts > :startTs AND tskv.ts < :endTs")
  38 + "AND tskv.ts > :startTs AND tskv.ts <= :endTs")
39 39 List<TsKvEntity> findAllWithLimit(@Param("entityId") String entityId,
40 40 @Param("entityType") EntityType entityType,
41 41 @Param("entityKey") String key,
... ... @@ -55,29 +55,63 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
55 55 @Param("endTs") long endTs);
56 56
57 57 @Async
58   - @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue), true) FROM TsKvEntity tskv " +
  58 + @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +
  59 + "WHERE tskv.strValue IS NOT NULL " +
  60 + "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  61 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  62 + CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") String entityId,
  63 + @Param("entityType") EntityType entityType,
  64 + @Param("entityKey") String entityKey,
  65 + @Param("startTs") long startTs,
  66 + @Param("endTs") long endTs);
  67 +
  68 + @Async
  69 + @Query("SELECT new TsKvEntity(MAX(COALESCE(tskv.longValue, -9223372036854775807)), " +
  70 + "MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " +
  71 + "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
  72 + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
  73 + "'MAX') FROM TsKvEntity tskv " +
59 74 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
60   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
61   - CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
  75 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  76 + CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") String entityId,
  77 + @Param("entityType") EntityType entityType,
  78 + @Param("entityKey") String entityKey,
  79 + @Param("startTs") long startTs,
  80 + @Param("endTs") long endTs);
  81 +
  82 +
  83 + @Async
  84 + @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +
  85 + "WHERE tskv.strValue IS NOT NULL " +
  86 + "AND tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  87 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  88 + CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") String entityId,
62 89 @Param("entityType") EntityType entityType,
63 90 @Param("entityKey") String entityKey,
64 91 @Param("startTs") long startTs,
65 92 @Param("endTs") long endTs);
66 93
67 94 @Async
68   - @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue), false) FROM TsKvEntity tskv " +
  95 + @Query("SELECT new TsKvEntity(MIN(COALESCE(tskv.longValue, 9223372036854775807)), " +
  96 + "MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " +
  97 + "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
  98 + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
  99 + "'MIN') FROM TsKvEntity tskv " +
69 100 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
70   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
71   - CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
  101 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
  102 + CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") String entityId,
72 103 @Param("entityType") EntityType entityType,
73 104 @Param("entityKey") String entityKey,
74 105 @Param("startTs") long startTs,
75 106 @Param("endTs") long endTs);
76 107
77 108 @Async
78   - @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
  109 + @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " +
  110 + "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " +
  111 + "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
  112 + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +
79 113 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
80   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  114 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
81 115 CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
82 116 @Param("entityType") EntityType entityType,
83 117 @Param("entityKey") String entityKey,
... ... @@ -85,23 +119,31 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
85 119 @Param("endTs") long endTs);
86 120
87 121 @Async
88   - @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
  122 + @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
  123 + "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
  124 + "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
  125 + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
  126 + "'AVG') FROM TsKvEntity tskv " +
89 127 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
90   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  128 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
91 129 CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
92 130 @Param("entityType") EntityType entityType,
93 131 @Param("entityKey") String entityKey,
94 132 @Param("startTs") long startTs,
95 133 @Param("endTs") long endTs);
96 134
97   -
98 135 @Async
99   - @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
  136 + @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +
  137 + "SUM(COALESCE(tskv.doubleValue, 0.0)), " +
  138 + "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +
  139 + "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +
  140 + "'SUM') FROM TsKvEntity tskv " +
100 141 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
101   - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  142 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
102 143 CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
103 144 @Param("entityType") EntityType entityType,
104 145 @Param("entityKey") String entityKey,
105 146 @Param("startTs") long startTs,
106 147 @Param("endTs") long endTs);
  148 +
107 149 }
... ...
... ... @@ -17,10 +17,7 @@ package org.thingsboard.server.dao.service.timeseries;
17 17
18 18 import com.datastax.driver.core.utils.UUIDs;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.junit.After;
21   -import org.junit.Assert;
22   -import org.junit.Before;
23   -import org.junit.Test;
  20 +import org.junit.*;
24 21 import org.thingsboard.server.common.data.EntityView;
25 22 import org.thingsboard.server.common.data.Tenant;
26 23 import org.thingsboard.server.common.data.id.DeviceId;
... ... @@ -280,6 +277,66 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
280 277
281 278 assertEquals(50000, list.get(2).getTs());
282 279 assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
  280 +
  281 +
  282 + entries.add(save(deviceId, 65000, "A1"));
  283 + entries.add(save(deviceId, 75000, "A2"));
  284 + entries.add(save(deviceId, 85000, "B1"));
  285 + entries.add(save(deviceId, 95000, "B2"));
  286 + entries.add(save(deviceId, 105000, "C1"));
  287 + entries.add(save(deviceId, 115000, "C2"));
  288 +
  289 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
  290 + 120000, 20000, 3, Aggregation.NONE))).get();
  291 + assertEquals(3, list.size());
  292 + assertEquals(115000, list.get(0).getTs());
  293 + assertEquals(java.util.Optional.of("C2"), list.get(0).getStrValue());
  294 +
  295 + assertEquals(105000, list.get(1).getTs());
  296 + assertEquals(java.util.Optional.of("C1"), list.get(1).getStrValue());
  297 +
  298 + assertEquals(95000, list.get(2).getTs());
  299 + assertEquals(java.util.Optional.of("B2"), list.get(2).getStrValue());
  300 +
  301 +
  302 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
  303 + 120000, 20000, 3, Aggregation.MIN))).get();
  304 +
  305 + assertEquals(3, list.size());
  306 + assertEquals(70000, list.get(0).getTs());
  307 + assertEquals(java.util.Optional.of("A1"), list.get(0).getStrValue());
  308 +
  309 + assertEquals(90000, list.get(1).getTs());
  310 + assertEquals(java.util.Optional.of("B1"), list.get(1).getStrValue());
  311 +
  312 + assertEquals(110000, list.get(2).getTs());
  313 + assertEquals(java.util.Optional.of("C1"), list.get(2).getStrValue());
  314 +
  315 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
  316 + 120000, 20000, 3, Aggregation.MAX))).get();
  317 +
  318 + assertEquals(3, list.size());
  319 + assertEquals(70000, list.get(0).getTs());
  320 + assertEquals(java.util.Optional.of("A2"), list.get(0).getStrValue());
  321 +
  322 + assertEquals(90000, list.get(1).getTs());
  323 + assertEquals(java.util.Optional.of("B2"), list.get(1).getStrValue());
  324 +
  325 + assertEquals(110000, list.get(2).getTs());
  326 + assertEquals(java.util.Optional.of("C2"), list.get(2).getStrValue());
  327 +
  328 + list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 60000,
  329 + 120000, 20000, 3, Aggregation.COUNT))).get();
  330 +
  331 + assertEquals(3, list.size());
  332 + assertEquals(70000, list.get(0).getTs());
  333 + assertEquals(java.util.Optional.of(2L), list.get(0).getLongValue());
  334 +
  335 + assertEquals(90000, list.get(1).getTs());
  336 + assertEquals(java.util.Optional.of(2L), list.get(1).getLongValue());
  337 +
  338 + assertEquals(110000, list.get(2).getTs());
  339 + assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
283 340 }
284 341
285 342 @Test
... ... @@ -385,6 +442,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
385 442 return entry;
386 443 }
387 444
  445 + private TsKvEntry save(DeviceId deviceId, long ts, String value) throws Exception {
  446 + TsKvEntry entry = new BasicTsKvEntry(ts, new StringDataEntry(LONG_KEY, value));
  447 + tsService.save(tenantId, deviceId, entry).get();
  448 + return entry;
  449 + }
  450 +
388 451
389 452 private void saveEntries(DeviceId deviceId, long ts) throws ExecutionException, InterruptedException {
390 453 tsService.save(tenantId, deviceId, toTsEntry(ts, stringKvEntry)).get();
... ...