Commit 5935de07b3aa94deecbd3c58fe2bc6420b033778

Authored by Volodymyr Babak
1 parent 6206d5fc

Added get ts async implementation

... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.Data;
  19 +import org.thingsboard.server.common.data.EntityType;
19 20 import org.thingsboard.server.common.data.kv.*;
20 21 import org.thingsboard.server.dao.model.ToData;
21 22
... ... @@ -32,8 +33,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
32 33 public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
33 34
34 35 @Id
  36 + @Enumerated(EnumType.STRING)
35 37 @Column(name = ENTITY_TYPE_COLUMN)
36   - private String entityType;
  38 + private EntityType entityType;
37 39
38 40 @Id
39 41 @Column(name = ENTITY_ID_COLUMN)
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sql;
18 18 import lombok.AllArgsConstructor;
19 19 import lombok.Data;
20 20 import lombok.NoArgsConstructor;
  21 +import org.thingsboard.server.common.data.EntityType;
21 22
22 23 import javax.persistence.Transient;
23 24 import java.io.Serializable;
... ... @@ -31,7 +32,7 @@ public class TsKvCompositeKey implements Serializable{
31 32 @Transient
32 33 private static final long serialVersionUID = -4089175869616037523L;
33 34
34   - private String entityType;
  35 + private EntityType entityType;
35 36 private UUID entityId;
36 37 private String key;
37 38 private long ts;
... ...
... ... @@ -16,7 +16,8 @@
16 16 package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.Data;
19   -import org.thingsboard.server.common.data.kv.TsKvEntry;
  19 +import org.thingsboard.server.common.data.EntityType;
  20 +import org.thingsboard.server.common.data.kv.*;
20 21 import org.thingsboard.server.dao.model.ToData;
21 22
22 23 import javax.persistence.*;
... ... @@ -30,9 +31,41 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
30 31 @IdClass(TsKvCompositeKey.class)
31 32 public final class TsKvEntity implements ToData<TsKvEntry> {
32 33
  34 + public TsKvEntity() {
  35 + }
  36 +
  37 + public TsKvEntity(Double avgLongValue, Double avgDoubleValue) {
  38 + this.longValue = avgLongValue.longValue();
  39 + this.doubleValue = avgDoubleValue;
  40 + }
  41 +
  42 + public TsKvEntity(Long sumLongValue, Double sumDoubleValue) {
  43 + this.longValue = sumLongValue;
  44 + this.doubleValue = sumDoubleValue;
  45 + }
  46 +
  47 + public TsKvEntity(String strValue, Long longValue, Double doubleValue) {
  48 + this.strValue = strValue;
  49 + this.longValue = longValue;
  50 + this.doubleValue = doubleValue;
  51 + }
  52 +
  53 + public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount) {
  54 + if (booleanValueCount != 0) {
  55 + this.longValue = booleanValueCount;
  56 + } else if (strValueCount != 0) {
  57 + this.longValue = strValueCount;
  58 + } else if (longValueCount != 0) {
  59 + this.longValue = longValueCount;
  60 + } else if (doubleValueCount != 0) {
  61 + this.longValue = doubleValueCount;
  62 + }
  63 + }
  64 +
33 65 @Id
  66 + @Enumerated(EnumType.STRING)
34 67 @Column(name = ENTITY_TYPE_COLUMN)
35   - private String entityType;
  68 + private EntityType entityType;
36 69
37 70 @Id
38 71 @Column(name = ENTITY_ID_COLUMN)
... ... @@ -60,6 +93,16 @@ public final class TsKvEntity implements ToData<TsKvEntry> {
60 93
61 94 @Override
62 95 public TsKvEntry toData() {
63   - return null;
  96 + KvEntry kvEntry = null;
  97 + if (strValue != null) {
  98 + kvEntry = new StringDataEntry(key, strValue);
  99 + } else if (longValue != null) {
  100 + kvEntry = new LongDataEntry(key, longValue);
  101 + } else if (doubleValue != null) {
  102 + kvEntry = new DoubleDataEntry(key, doubleValue);
  103 + } else if (booleanValue != null) {
  104 + kvEntry = new BooleanDataEntry(key, booleanValue);
  105 + }
  106 + return new BasicTsKvEntry(ts, kvEntry);
64 107 }
65 108 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.*;
  19 +import org.thingsboard.server.common.data.EntityType;
19 20
20 21 import javax.persistence.Transient;
21 22 import java.io.Serializable;
... ... @@ -29,7 +30,7 @@ public class TsKvLatestCompositeKey implements Serializable{
29 30 @Transient
30 31 private static final long serialVersionUID = -4089175869616037523L;
31 32
32   - private String entityType;
  33 + private EntityType entityType;
33 34 private UUID entityId;
34 35 private String key;
35 36 }
\ No newline at end of file
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.dao.model.sql;
17 17
18 18 import lombok.Data;
  19 +import org.thingsboard.server.common.data.EntityType;
19 20 import org.thingsboard.server.common.data.kv.*;
20 21 import org.thingsboard.server.dao.model.ToData;
21 22
... ... @@ -31,8 +32,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
31 32 public final class TsKvLatestEntity implements ToData<TsKvEntry> {
32 33
33 34 @Id
  35 + @Enumerated(EnumType.STRING)
34 36 @Column(name = ENTITY_TYPE_COLUMN)
35   - private String entityType;
  37 + private EntityType entityType;
36 38
37 39 @Id
38 40 @Column(name = ENTITY_ID_COLUMN)
... ...
... ... @@ -83,7 +83,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
83 83 @Override
84 84 public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
85 85 AttributeKvEntity entity = new AttributeKvEntity();
86   - entity.setEntityType(entityId.getEntityType().name());
  86 + entity.setEntityType(entityId.getEntityType());
87 87 entity.setEntityId(entityId.getId());
88 88 entity.setAttributeType(attributeType);
89 89 entity.setAttributeKey(attribute.getKey());
... ... @@ -104,7 +104,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
104 104 .stream()
105 105 .map(key -> {
106 106 AttributeKvEntity entityToDelete = new AttributeKvEntity();
107   - entityToDelete.setEntityType(entityId.getEntityType().name());
  107 + entityToDelete.setEntityType(entityId.getEntityType());
108 108 entityToDelete.setEntityId(entityId.getId());
109 109 entityToDelete.setAttributeType(attributeType);
110 110 entityToDelete.setAttributeKey(key);
... ...
... ... @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
21 21 import com.google.common.util.concurrent.ListenableFuture;
22 22 import lombok.extern.slf4j.Slf4j;
23 23 import org.springframework.beans.factory.annotation.Autowired;
  24 +import org.springframework.data.domain.PageRequest;
24 25 import org.springframework.stereotype.Component;
25 26 import org.thingsboard.server.common.data.id.EntityId;
26 27 import org.thingsboard.server.common.data.kv.Aggregation;
... ... @@ -35,6 +36,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
35 36 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
36 37
37 38 import javax.annotation.Nullable;
  39 +import java.util.ArrayList;
38 40 import java.util.List;
39 41 import java.util.stream.Collectors;
40 42
... ... @@ -73,19 +75,93 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
73 75 if (query.getAggregation() == Aggregation.NONE) {
74 76 return findAllAsyncWithLimit(entityId, query);
75 77 } else {
76   - return service.submit(() -> null);
  78 + long stepTs = query.getStartTs();
  79 + List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>();
  80 + while (stepTs < query.getEndTs()) {
  81 + long startTs = stepTs;
  82 + long endTs = stepTs + query.getInterval();
  83 + long ts = startTs + (endTs - startTs) / 2;
  84 + futures.add(findAndAggregateAsync(entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
  85 + stepTs = endTs;
  86 + }
  87 + return Futures.allAsList(futures);
  88 + }
  89 + }
  90 +
  91 + private ListenableFuture<TsKvEntry> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
  92 + TsKvEntity entity;
  93 + switch (aggregation) {
  94 + case AVG:
  95 + entity = tsKvRepository.findAvg(
  96 + entityId.getId(),
  97 + entityId.getEntityType(),
  98 + key,
  99 + startTs,
  100 + endTs);
  101 +
  102 + break;
  103 + case MAX:
  104 + entity = tsKvRepository.findMax(
  105 + entityId.getId(),
  106 + entityId.getEntityType(),
  107 + key,
  108 + startTs,
  109 + endTs);
  110 +
  111 + break;
  112 + case MIN:
  113 + entity = tsKvRepository.findMin(
  114 + entityId.getId(),
  115 + entityId.getEntityType(),
  116 + key,
  117 + startTs,
  118 + endTs);
  119 +
  120 + break;
  121 + case SUM:
  122 + entity = tsKvRepository.findSum(
  123 + entityId.getId(),
  124 + entityId.getEntityType(),
  125 + key,
  126 + startTs,
  127 + endTs);
  128 +
  129 + break;
  130 + case COUNT:
  131 + entity = tsKvRepository.findCount(
  132 + entityId.getId(),
  133 + entityId.getEntityType(),
  134 + key,
  135 + startTs,
  136 + endTs);
  137 +
  138 + break;
  139 + default:
  140 + entity = null;
  141 + }
  142 + if (entity != null){
  143 + entity.setTs(ts);
77 144 }
  145 + return service.submit(() -> DaoUtil.getData(entity));
78 146 }
79 147
80 148 private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) {
81   - return service.submit(() -> null);
  149 + return service.submit(() ->
  150 + DaoUtil.convertDataList(
  151 + tsKvRepository.findAllWithLimit(
  152 + entityId.getId(),
  153 + entityId.getEntityType(),
  154 + query.getKey(),
  155 + query.getStartTs(),
  156 + query.getEndTs(),
  157 + new PageRequest(0, query.getLimit()))));
82 158 }
83 159
84 160 @Override
85 161 public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
86 162 TsKvLatestCompositeKey compositeKey =
87 163 new TsKvLatestCompositeKey(
88   - entityId.getEntityType().name(),
  164 + entityId.getEntityType(),
89 165 entityId.getId(),
90 166 key);
91 167 return service.submit(() ->
... ... @@ -104,7 +180,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
104 180 @Override
105 181 public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
106 182 TsKvEntity entity = new TsKvEntity();
107   - entity.setEntityType(entityId.getEntityType().name());
  183 + entity.setEntityType(entityId.getEntityType());
108 184 entity.setEntityId(entityId.getId());
109 185 entity.setTs(tsKvEntry.getTs());
110 186 entity.setKey(tsKvEntry.getKey());
... ... @@ -126,7 +202,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
126 202 @Override
127 203 public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
128 204 TsKvLatestEntity latestEntity = new TsKvLatestEntity();
129   - latestEntity.setEntityType(entityId.getEntityType().name());
  205 + latestEntity.setEntityType(entityId.getEntityType());
130 206 latestEntity.setEntityId(entityId.getId());
131 207 latestEntity.setTs(tsKvEntry.getTs());
132 208 latestEntity.setKey(tsKvEntry.getKey());
... ...
... ... @@ -15,11 +15,76 @@
15 15 */
16 16 package org.thingsboard.server.dao.sql.timeseries;
17 17
  18 +import org.springframework.data.domain.Pageable;
  19 +import org.springframework.data.jpa.repository.Query;
18 20 import org.springframework.data.repository.CrudRepository;
  21 +import org.springframework.data.repository.query.Param;
  22 +import org.thingsboard.server.common.data.EntityType;
19 23 import org.thingsboard.server.dao.annotation.SqlDao;
20 24 import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
21 25 import org.thingsboard.server.dao.model.sql.TsKvEntity;
22 26
  27 +import java.util.List;
  28 +import java.util.UUID;
  29 +
23 30 @SqlDao
24 31 public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
  32 +
  33 + @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
  34 + "AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
  35 + "AND tskv.ts > :startTs AND tskv.ts < :endTs ORDER BY tskv.ts DESC")
  36 + List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,
  37 + @Param("entityType") EntityType entityType,
  38 + @Param("entityKey") String key,
  39 + @Param("startTs") long startTs,
  40 + @Param("endTs") long endTs,
  41 + Pageable pageable);
  42 +
  43 + @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
  44 + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  45 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  46 + TsKvEntity findMax(@Param("entityId") UUID entityId,
  47 + @Param("entityType") EntityType entityType,
  48 + @Param("entityKey") String entityKey,
  49 + @Param("startTs") long startTs,
  50 + @Param("endTs") long endTs);
  51 +
  52 + @Query("SELECT new TsKvEntity(MIN(tskv.strValue), MIN(tskv.longValue), MIN(tskv.doubleValue)) FROM TsKvEntity tskv " +
  53 + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  54 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  55 + TsKvEntity findMin(@Param("entityId") UUID entityId,
  56 + @Param("entityType") EntityType entityType,
  57 + @Param("entityKey") String entityKey,
  58 + @Param("startTs") long startTs,
  59 + @Param("endTs") long endTs);
  60 +
  61 +
  62 + @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
  63 + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  64 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  65 + TsKvEntity findCount(@Param("entityId") UUID entityId,
  66 + @Param("entityType") EntityType entityType,
  67 + @Param("entityKey") String entityKey,
  68 + @Param("startTs") long startTs,
  69 + @Param("endTs") long endTs);
  70 +
  71 +
  72 + @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
  73 + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  74 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  75 + TsKvEntity findAvg(@Param("entityId") UUID entityId,
  76 + @Param("entityType") EntityType entityType,
  77 + @Param("entityKey") String entityKey,
  78 + @Param("startTs") long startTs,
  79 + @Param("endTs") long endTs);
  80 +
  81 +
  82 + @Query("SELECT new TsKvEntity(SUM(tskv.longValue), SUM(tskv.doubleValue)) FROM TsKvEntity tskv " +
  83 + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
  84 + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
  85 + TsKvEntity findSum(@Param("entityId") UUID entityId,
  86 + @Param("entityType") EntityType entityType,
  87 + @Param("entityKey") String entityKey,
  88 + @Param("startTs") long startTs,
  89 + @Param("endTs") long endTs);
25 90 }
... ...