Commit e0f753025d15143b023260cc26a230c1c0e6fe57

Authored by Andrew Shvayka
1 parent 4525174b

Fixed JPA Async methods

... ... @@ -18,11 +18,13 @@ package org.thingsboard.server;
18 18 import org.springframework.boot.SpringApplication;
19 19 import org.springframework.boot.SpringBootConfiguration;
20 20 import org.springframework.context.annotation.ComponentScan;
  21 +import org.springframework.scheduling.annotation.EnableAsync;
21 22 import springfox.documentation.swagger2.annotations.EnableSwagger2;
22 23
23 24 import java.util.Arrays;
24 25
25 26 @SpringBootConfiguration
  27 +@EnableAsync
26 28 @EnableSwagger2
27 29 @ComponentScan({"org.thingsboard.server"})
28 30 public class ThingsboardServerApplication {
... ...
... ... @@ -19,6 +19,7 @@ import com.google.common.base.Function;
19 19 import com.google.common.collect.Lists;
20 20 import com.google.common.util.concurrent.Futures;
21 21 import com.google.common.util.concurrent.ListenableFuture;
  22 +import com.google.common.util.concurrent.SettableFuture;
22 23 import lombok.extern.slf4j.Slf4j;
23 24 import org.springframework.beans.factory.annotation.Autowired;
24 25 import org.springframework.data.domain.PageRequest;
... ... @@ -39,6 +40,8 @@ import org.thingsboard.server.dao.util.SqlDao;
39 40 import javax.annotation.Nullable;
40 41 import java.util.ArrayList;
41 42 import java.util.List;
  43 +import java.util.Optional;
  44 +import java.util.concurrent.CompletableFuture;
42 45 import java.util.stream.Collectors;
43 46
44 47 import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
... ... @@ -80,7 +83,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
80 83 return findAllAsyncWithLimit(entityId, query);
81 84 } else {
82 85 long stepTs = query.getStartTs();
83   - List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
  86 + List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
84 87 while (stepTs < query.getEndTs()) {
85 88 long startTs = stepTs;
86 89 long endTs = stepTs + query.getInterval();
... ... @@ -88,16 +91,30 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
88 91 futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
89 92 stepTs = endTs;
90 93 }
91   - return Futures.allAsList(futures);
  94 + ListenableFuture<List<Optional<TsKvEntry>>> future = Futures.allAsList(futures);
  95 + return Futures.transform(future, new Function<List<Optional<TsKvEntry>>, List<TsKvEntry>>() {
  96 + @Nullable
  97 + @Override
  98 + public List<TsKvEntry> apply(@Nullable List<Optional<TsKvEntry>> results) {
  99 + if (results == null || results.isEmpty()) {
  100 + return null;
  101 + }
  102 + return results.stream()
  103 + .filter(Optional::isPresent)
  104 + .map(Optional::get)
  105 + .collect(Collectors.toList());
  106 + }
  107 + }, service);
92 108 }
93 109 }
94 110
95   - private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
96   - TsKvEntity entity;
  111 + private ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
  112 + CompletableFuture<TsKvEntity> entity;
  113 + String entityIdStr = fromTimeUUID(entityId.getId());
97 114 switch (aggregation) {
98 115 case AVG:
99 116 entity = tsKvRepository.findAvg(
100   - fromTimeUUID(entityId.getId()),
  117 + entityIdStr,
101 118 entityId.getEntityType(),
102 119 key,
103 120 startTs,
... ... @@ -106,7 +123,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
106 123 break;
107 124 case MAX:
108 125 entity = tsKvRepository.findMax(
109   - fromTimeUUID(entityId.getId()),
  126 + entityIdStr,
110 127 entityId.getEntityType(),
111 128 key,
112 129 startTs,
... ... @@ -115,7 +132,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
115 132 break;
116 133 case MIN:
117 134 entity = tsKvRepository.findMin(
118   - fromTimeUUID(entityId.getId()),
  135 + entityIdStr,
119 136 entityId.getEntityType(),
120 137 key,
121 138 startTs,
... ... @@ -124,7 +141,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
124 141 break;
125 142 case SUM:
126 143 entity = tsKvRepository.findSum(
127   - fromTimeUUID(entityId.getId()),
  144 + entityIdStr,
128 145 entityId.getEntityType(),
129 146 key,
130 147 startTs,
... ... @@ -133,7 +150,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
133 150 break;
134 151 case COUNT:
135 152 entity = tsKvRepository.findCount(
136   - fromTimeUUID(entityId.getId()),
  153 + entityIdStr,
137 154 entityId.getEntityType(),
138 155 key,
139 156 startTs,
... ... @@ -141,12 +158,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
141 158
142 159 break;
143 160 default:
144   - entity = null;
  161 + throw new IllegalArgumentException("Not supported aggregation type: " + aggregation);
145 162 }
146   - if (entity != null) {
147   - entity.setTs(ts);
148   - }
149   - return service.submit(() -> DaoUtil.getData(entity));
  163 +
  164 + SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
  165 + entity.whenComplete((tsKvEntity, throwable) -> {
  166 + if (throwable != null) {
  167 + listenableFuture.setException(throwable);
  168 + } else {
  169 + listenableFuture.set(tsKvEntity);
  170 + }
  171 + });
  172 + return Futures.transform(listenableFuture, new Function<TsKvEntity, Optional<TsKvEntry>>() {
  173 + @Nullable
  174 + @Override
  175 + public Optional<TsKvEntry> apply(@Nullable TsKvEntity entity) {
  176 + if (entity != null && entity.isNotEmpty()) {
  177 + entity.setEntityId(entityIdStr);
  178 + entity.setEntityType(entityId.getEntityType());
  179 + entity.setKey(key);
  180 + entity.setTs(ts);
  181 + return Optional.of(DaoUtil.getData(entity));
  182 + } else {
  183 + return Optional.empty();
  184 + }
  185 + }
  186 + });
150 187 }
151 188
152 189 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
... ...
... ... @@ -26,7 +26,7 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity;
26 26 import org.thingsboard.server.dao.util.SqlDao;
27 27
28 28 import java.util.List;
29   -import java.util.concurrent.Future;
  29 +import java.util.concurrent.CompletableFuture;
30 30
31 31 @SqlDao
32 32 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
... ... @@ -45,17 +45,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
45 45 @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
46 46 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
47 47 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
48   - Future<TsKvEntity> findMax(@Param("entityId") String entityId,
49   - @Param("entityType") EntityType entityType,
50   - @Param("entityKey") String entityKey,
51   - @Param("startTs") long startTs,
52   - @Param("endTs") long endTs);
  48 + CompletableFuture<TsKvEntity> findMax(@Param("entityId") String entityId,
  49 + @Param("entityType") EntityType entityType,
  50 + @Param("entityKey") String entityKey,
  51 + @Param("startTs") long startTs,
  52 + @Param("endTs") long endTs);
53 53
54 54 @Async
55 55 @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
56 56 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
57 57 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
58   - Future<TsKvEntity> findMin(@Param("entityId") String entityId,
  58 + CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
59 59 @Param("entityType") EntityType entityType,
60 60 @Param("entityKey") String entityKey,
61 61 @Param("startTs") long startTs,
... ... @@ -65,7 +65,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
65 65 @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
66 66 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
67 67 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
68   - Future<TsKvEntity> findCount(@Param("entityId") String entityId,
  68 + CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
69 69 @Param("entityType") EntityType entityType,
70 70 @Param("entityKey") String entityKey,
71 71 @Param("startTs") long startTs,
... ... @@ -75,7 +75,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
75 75 @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
76 76 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
77 77 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
78   - Future<TsKvEntity> findAvg(@Param("entityId") String entityId,
  78 + CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
79 79 @Param("entityType") EntityType entityType,
80 80 @Param("entityKey") String entityKey,
81 81 @Param("startTs") long startTs,
... ... @@ -86,7 +86,7 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
86 86 @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
87 87 "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
88 88 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
89   - Future<TsKvEntity> findSum(@Param("entityId") String entityId,
  89 + CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
90 90 @Param("entityType") EntityType entityType,
91 91 @Param("entityKey") String entityKey,
92 92 @Param("startTs") long startTs,
... ...