Commit 87ade8ada31a4e3c424dce67bdadd78af9b6f78a

Authored by Igor Kulikov
2 parents 2f11d433 e563939c

Merge branch 'master' into develop/3.0

Showing 30 changed files with 258 additions and 707 deletions
@@ -93,6 +93,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ @@ -93,6 +93,7 @@ public class BaseRuleChainTransactionService implements RuleChainTransactionServ
93 TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration); 93 TbTransactionTask transactionTask = new TbTransactionTask(msg, onStart, onEnd, onFailure, System.currentTimeMillis() + duration);
94 int queueSize = queue.size(); 94 int queueSize = queue.size();
95 if (queueSize >= finalQueueSize) { 95 if (queueSize >= finalQueueSize) {
  96 + log.trace("Queue has no space: {}", transactionTask);
96 executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!"); 97 executeOnFailure(transactionTask.getOnFailure(), "Queue has no space!");
97 } else { 98 } else {
98 addMsgToQueues(queue, transactionTask); 99 addMsgToQueues(queue, transactionTask);
@@ -248,9 +248,9 @@ actors: @@ -248,9 +248,9 @@ actors:
248 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}" 248 error_persist_frequency: "${ACTORS_RULE_NODE_ERROR_FREQUENCY:3000}"
249 transaction: 249 transaction:
250 # Size of queues which store messages for transaction rule nodes 250 # Size of queues which store messages for transaction rule nodes
251 - queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:20}" 251 + queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}"
252 # Time in milliseconds for transaction to complete 252 # Time in milliseconds for transaction to complete
253 - duration: "${ACTORS_RULE_TRANSACTION_DURATION:15000}" 253 + duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}"
254 statistics: 254 statistics:
255 # Enable/disable actor statistics 255 # Enable/disable actor statistics
256 enabled: "${ACTORS_STATISTICS_ENABLED:true}" 256 enabled: "${ACTORS_STATISTICS_ENABLED:true}"
@@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
26 26
27 @Configuration 27 @Configuration
28 @EnableAutoConfiguration 28 @EnableAutoConfiguration
29 -@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest"})  
30 -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})  
31 -@EntityScan({"org.thingsboard.server.dao.model.sqlts.hsql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) 29 +@ComponentScan({"org.thingsboard.server.dao.sqlts.hsql"})
  30 +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.hsql", "org.thingsboard.server.dao.sqlts.insert.latest.hsql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
  31 +@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
32 @EnableTransactionManagement 32 @EnableTransactionManagement
33 @SqlTsDao 33 @SqlTsDao
34 @HsqlDao 34 @HsqlDao
35 public class HsqlTsDaoConfig { 35 public class HsqlTsDaoConfig {
36 36
37 -}  
  37 +}
@@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
26 26
27 @Configuration 27 @Configuration
28 @EnableAutoConfiguration 28 @EnableAutoConfiguration
29 -@ComponentScan({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest"})  
30 -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})  
31 -@EntityScan({"org.thingsboard.server.dao.model.sqlts.psql", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"}) 29 +@ComponentScan({"org.thingsboard.server.dao.sqlts.psql"})
  30 +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.ts", "org.thingsboard.server.dao.sqlts.insert.psql", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.latest", "org.thingsboard.server.dao.sqlts.dictionary"})
  31 +@EntityScan({"org.thingsboard.server.dao.model.sqlts.ts", "org.thingsboard.server.dao.model.sqlts.latest", "org.thingsboard.server.dao.model.sqlts.dictionary"})
32 @EnableTransactionManagement 32 @EnableTransactionManagement
33 @SqlTsDao 33 @SqlTsDao
34 @PsqlDao 34 @PsqlDao
35 public class PsqlTsDaoConfig { 35 public class PsqlTsDaoConfig {
36 36
37 -}  
  37 +}
@@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; @@ -26,12 +26,12 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
26 26
27 @Configuration 27 @Configuration
28 @EnableAutoConfiguration 28 @EnableAutoConfiguration
29 -@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.latest"})  
30 -@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"}) 29 +@ComponentScan({"org.thingsboard.server.dao.sqlts.timescale"})
  30 +@EnableJpaRepositories({"org.thingsboard.server.dao.sqlts.timescale", "org.thingsboard.server.dao.sqlts.insert.latest.psql", "org.thingsboard.server.dao.sqlts.insert.timescale", "org.thingsboard.server.dao.sqlts.dictionary", "org.thingsboard.server.dao.sqlts.latest"})
31 @EntityScan({"org.thingsboard.server.dao.model.sqlts.timescale", "org.thingsboard.server.dao.model.sqlts.dictionary", "org.thingsboard.server.dao.model.sqlts.latest"}) 31 @EntityScan({"org.thingsboard.server.dao.model.sqlts.timescale", "org.thingsboard.server.dao.model.sqlts.dictionary", "org.thingsboard.server.dao.model.sqlts.latest"})
32 @EnableTransactionManagement 32 @EnableTransactionManagement
33 @TimescaleDBTsDao 33 @TimescaleDBTsDao
34 @PsqlDao 34 @PsqlDao
35 public class TimescaleDaoConfig { 35 public class TimescaleDaoConfig {
36 36
37 -}  
  37 +}
1 -/**  
2 - * Copyright © 2016-2020 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.model.sqlts.hsql;  
17 -  
18 -import lombok.AllArgsConstructor;  
19 -import lombok.Data;  
20 -import lombok.NoArgsConstructor;  
21 -import org.thingsboard.server.common.data.EntityType;  
22 -  
23 -import javax.persistence.Transient;  
24 -import java.io.Serializable;  
25 -import java.util.UUID;  
26 -  
27 -@Data  
28 -@AllArgsConstructor  
29 -@NoArgsConstructor  
30 -public class TsKvCompositeKey implements Serializable {  
31 -  
32 - @Transient  
33 - private static final long serialVersionUID = -4089175869616037523L;  
34 -  
35 - private UUID entityId;  
36 - private int key;  
37 - private long ts;  
38 -  
39 -}  
1 -/**  
2 - * Copyright © 2016-2020 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.model.sqlts.hsql;  
17 -  
18 -import lombok.Data;  
19 -import org.thingsboard.server.common.data.kv.TsKvEntry;  
20 -import org.thingsboard.server.dao.model.ToData;  
21 -import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;  
22 -  
23 -import javax.persistence.Column;  
24 -import javax.persistence.Entity;  
25 -import javax.persistence.Id;  
26 -import javax.persistence.IdClass;  
27 -import javax.persistence.Table;  
28 -  
29 -import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;  
30 -  
31 -@Data  
32 -@Entity  
33 -@Table(name = "ts_kv")  
34 -@IdClass(TsKvCompositeKey.class)  
35 -public final class TsKvEntity extends AbstractTsKvEntity implements ToData<TsKvEntry> {  
36 -  
37 - @Id  
38 - @Column(name = KEY_COLUMN)  
39 - private int key;  
40 -  
41 - public TsKvEntity() {  
42 - }  
43 -  
44 - public TsKvEntity(String strValue) {  
45 - this.strValue = strValue;  
46 - }  
47 -  
48 - public TsKvEntity(Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String aggType) {  
49 - if (!isAllNull(longValue, doubleValue, longCountValue, doubleCountValue)) {  
50 - switch (aggType) {  
51 - case AVG:  
52 - double sum = 0.0;  
53 - if (longValue != null) {  
54 - sum += longValue;  
55 - }  
56 - if (doubleValue != null) {  
57 - sum += doubleValue;  
58 - }  
59 - long totalCount = longCountValue + doubleCountValue;  
60 - if (totalCount > 0) {  
61 - this.doubleValue = sum / (longCountValue + doubleCountValue);  
62 - } else {  
63 - this.doubleValue = 0.0;  
64 - }  
65 - break;  
66 - case SUM:  
67 - if (doubleCountValue > 0) {  
68 - this.doubleValue = doubleValue + (longValue != null ? longValue.doubleValue() : 0.0);  
69 - } else {  
70 - this.longValue = longValue;  
71 - }  
72 - break;  
73 - case MIN:  
74 - case MAX:  
75 - if (longCountValue > 0 && doubleCountValue > 0) {  
76 - this.doubleValue = MAX.equals(aggType) ? Math.max(doubleValue, longValue.doubleValue()) : Math.min(doubleValue, longValue.doubleValue());  
77 - } else if (doubleCountValue > 0) {  
78 - this.doubleValue = doubleValue;  
79 - } else if (longCountValue > 0) {  
80 - this.longValue = longValue;  
81 - }  
82 - break;  
83 - }  
84 - }  
85 - }  
86 -  
87 - public TsKvEntity(Long booleanValueCount, Long strValueCount, Long longValueCount, Long doubleValueCount, Long jsonValueCount) {  
88 - if (!isAllNull(booleanValueCount, strValueCount, longValueCount, doubleValueCount)) {  
89 - if (booleanValueCount != 0) {  
90 - this.longValue = booleanValueCount;  
91 - } else if (strValueCount != 0) {  
92 - this.longValue = strValueCount;  
93 - } else if (jsonValueCount != 0) {  
94 - this.longValue = jsonValueCount;  
95 - } else {  
96 - this.longValue = longValueCount + doubleValueCount;  
97 - }  
98 - }  
99 - }  
100 -  
101 - @Override  
102 - public boolean isNotEmpty() {  
103 - return strValue != null || longValue != null || doubleValue != null || booleanValue != null;  
104 - }  
105 -}  
dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvCompositeKey.java renamed from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvCompositeKey.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.model.sqlts.timescale; 16 +package org.thingsboard.server.dao.model.sqlts.timescale.ts;
17 17
18 import lombok.AllArgsConstructor; 18 import lombok.AllArgsConstructor;
19 import lombok.Data; 19 import lombok.Data;
@@ -35,4 +35,4 @@ public class TimescaleTsKvCompositeKey implements Serializable { @@ -35,4 +35,4 @@ public class TimescaleTsKvCompositeKey implements Serializable {
35 private UUID entityId; 35 private UUID entityId;
36 private int key; 36 private int key;
37 private long ts; 37 private long ts;
38 -}  
  38 +}
dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/ts/TimescaleTsKvEntity.java renamed from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.model.sqlts.timescale; 16 +package org.thingsboard.server.dao.model.sqlts.timescale.ts;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 import lombok.EqualsAndHashCode; 19 import lombok.EqualsAndHashCode;
@@ -191,4 +191,4 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD @@ -191,4 +191,4 @@ public final class TimescaleTsKvEntity extends AbstractTsKvEntity implements ToD
191 public boolean isNotEmpty() { 191 public boolean isNotEmpty() {
192 return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null); 192 return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null || jsonValue != null);
193 } 193 }
194 -}  
  194 +}
dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvCompositeKey.java renamed from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvCompositeKey.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.model.sqlts.psql; 16 +package org.thingsboard.server.dao.model.sqlts.ts;
17 17
18 import lombok.AllArgsConstructor; 18 import lombok.AllArgsConstructor;
19 import lombok.Data; 19 import lombok.Data;
@@ -34,4 +34,4 @@ public class TsKvCompositeKey implements Serializable { @@ -34,4 +34,4 @@ public class TsKvCompositeKey implements Serializable {
34 private UUID entityId; 34 private UUID entityId;
35 private int key; 35 private int key;
36 private long ts; 36 private long ts;
37 -}  
  37 +}
dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java renamed from dao/src/main/java/org/thingsboard/server/dao/model/sqlts/psql/TsKvEntity.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.model.sqlts.psql; 16 +package org.thingsboard.server.dao.model.sqlts.ts;
17 17
18 import lombok.Data; 18 import lombok.Data;
19 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; 19 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
@@ -15,32 +15,45 @@ @@ -15,32 +15,45 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sqlts; 16 package org.thingsboard.server.dao.sqlts;
17 17
  18 +import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture; 19 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture; 20 import com.google.common.util.concurrent.SettableFuture;
20 import lombok.extern.slf4j.Slf4j; 21 import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.data.domain.PageRequest;
  24 +import org.springframework.data.domain.Sort;
22 import org.thingsboard.server.common.data.id.EntityId; 25 import org.thingsboard.server.common.data.id.EntityId;
23 import org.thingsboard.server.common.data.id.TenantId; 26 import org.thingsboard.server.common.data.id.TenantId;
24 import org.thingsboard.server.common.data.kv.Aggregation; 27 import org.thingsboard.server.common.data.kv.Aggregation;
  28 +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
  29 +import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
25 import org.thingsboard.server.common.data.kv.TsKvEntry; 30 import org.thingsboard.server.common.data.kv.TsKvEntry;
26 -import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; 31 +import org.thingsboard.server.dao.DaoUtil;
  32 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
27 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; 33 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
28 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; 34 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  35 +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
  36 +import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
  37 +import org.thingsboard.server.dao.timeseries.TimeseriesDao;
29 38
30 import javax.annotation.PostConstruct; 39 import javax.annotation.PostConstruct;
31 import javax.annotation.PreDestroy; 40 import javax.annotation.PreDestroy;
  41 +import java.util.ArrayList;
32 import java.util.List; 42 import java.util.List;
33 import java.util.Optional; 43 import java.util.Optional;
34 import java.util.concurrent.CompletableFuture; 44 import java.util.concurrent.CompletableFuture;
35 import java.util.stream.Collectors; 45 import java.util.stream.Collectors;
36 46
37 @Slf4j 47 @Slf4j
38 -public abstract class AbstractChunkedAggregationTimeseriesDao<T extends AbstractTsKvEntity> extends AbstractSqlTimeseriesDao { 48 +public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSqlTimeseriesDao implements TimeseriesDao {
39 49
40 @Autowired 50 @Autowired
41 - protected InsertTsRepository<T> insertRepository; 51 + protected TsKvRepository tsKvRepository;
42 52
43 - protected TbSqlBlockingQueue<EntityContainer<T>> tsQueue; 53 + @Autowired
  54 + protected InsertTsRepository<TsKvEntity> insertRepository;
  55 +
  56 + protected TbSqlBlockingQueue<EntityContainer<TsKvEntity>> tsQueue;
44 57
45 @PostConstruct 58 @PostConstruct
46 protected void init() { 59 protected void init() {
@@ -63,9 +76,102 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract @@ -63,9 +76,102 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
63 } 76 }
64 } 77 }
65 78
66 - protected abstract ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation); 79 + @Override
  80 + public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
  81 + return service.submit(() -> {
  82 + tsKvRepository.delete(
  83 + entityId.getId(),
  84 + getOrSaveKeyId(query.getKey()),
  85 + query.getStartTs(),
  86 + query.getEndTs());
  87 + return null;
  88 + });
  89 + }
  90 +
  91 + @Override
  92 + public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
  93 + return getSaveLatestFuture(entityId, tsKvEntry);
  94 + }
  95 +
  96 + @Override
  97 + public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
  98 + return getRemoveLatestFuture(tenantId, entityId, query);
  99 + }
  100 +
  101 + @Override
  102 + public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
  103 + return getFindLatestFuture(entityId, key);
  104 + }
  105 +
  106 + @Override
  107 + public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
  108 + return getFindAllLatestFuture(entityId);
  109 + }
  110 +
  111 + @Override
  112 + public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {
  113 + return Futures.immediateFuture(null);
  114 + }
  115 +
  116 + @Override
  117 + public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
  118 + return Futures.immediateFuture(null);
  119 + }
  120 +
  121 + @Override
  122 + public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
  123 + return processFindAllAsync(tenantId, entityId, queries);
  124 + }
  125 +
  126 + @Override
  127 + protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  128 + if (query.getAggregation() == Aggregation.NONE) {
  129 + return findAllAsyncWithLimit(tenantId, entityId, query);
  130 + } else {
  131 + long stepTs = query.getStartTs();
  132 + List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();
  133 + while (stepTs < query.getEndTs()) {
  134 + long startTs = stepTs;
  135 + long endTs = stepTs + query.getInterval();
  136 + long ts = startTs + (endTs - startTs) / 2;
  137 + futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));
  138 + stepTs = endTs;
  139 + }
  140 + return getTskvEntriesFuture(Futures.allAsList(futures));
  141 + }
  142 + }
  143 +
  144 + @Override
  145 + protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
  146 + Integer keyId = getOrSaveKeyId(query.getKey());
  147 + List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(
  148 + entityId.getId(),
  149 + keyId,
  150 + query.getStartTs(),
  151 + query.getEndTs(),
  152 + new PageRequest(0, query.getLimit(),
  153 + new Sort(Sort.Direction.fromString(
  154 + query.getOrderBy()), "ts")));
  155 + tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
  156 + return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));
  157 + }
67 158
68 - protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<T>> entitiesFutures) { 159 + protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {
  160 + List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();
  161 + switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);
  162 + return Futures.transform(setFutures(entitiesFutures), entity -> {
  163 + if (entity != null && entity.isNotEmpty()) {
  164 + entity.setEntityId(entityId.getId());
  165 + entity.setStrKey(key);
  166 + entity.setTs(ts);
  167 + return Optional.of(DaoUtil.getData(entity));
  168 + } else {
  169 + return Optional.empty();
  170 + }
  171 + });
  172 + }
  173 +
  174 + protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
69 switch (aggregation) { 175 switch (aggregation) {
70 case AVG: 176 case AVG:
71 findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures); 177 findAvg(tenantId, entityId, key, startTs, endTs, entitiesFutures);
@@ -87,19 +193,64 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract @@ -87,19 +193,64 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
87 } 193 }
88 } 194 }
89 195
90 - protected abstract void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures); 196 + protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  197 + Integer keyId = getOrSaveKeyId(key);
  198 + entitiesFutures.add(tsKvRepository.findCount(
  199 + entityId.getId(),
  200 + keyId,
  201 + startTs,
  202 + endTs));
  203 + }
91 204
92 - protected abstract void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures); 205 + protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  206 + Integer keyId = getOrSaveKeyId(key);
  207 + entitiesFutures.add(tsKvRepository.findSum(
  208 + entityId.getId(),
  209 + keyId,
  210 + startTs,
  211 + endTs));
  212 + }
93 213
94 - protected abstract void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures); 214 + protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  215 + Integer keyId = getOrSaveKeyId(key);
  216 + entitiesFutures.add(tsKvRepository.findStringMin(
  217 + entityId.getId(),
  218 + keyId,
  219 + startTs,
  220 + endTs));
  221 + entitiesFutures.add(tsKvRepository.findNumericMin(
  222 + entityId.getId(),
  223 + keyId,
  224 + startTs,
  225 + endTs));
  226 + }
95 227
96 - protected abstract void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures); 228 + protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  229 + Integer keyId = getOrSaveKeyId(key);
  230 + entitiesFutures.add(tsKvRepository.findStringMax(
  231 + entityId.getId(),
  232 + keyId,
  233 + startTs,
  234 + endTs));
  235 + entitiesFutures.add(tsKvRepository.findNumericMax(
  236 + entityId.getId(),
  237 + keyId,
  238 + startTs,
  239 + endTs));
  240 + }
97 241
98 - protected abstract void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<T>> entitiesFutures); 242 + protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  243 + Integer keyId = getOrSaveKeyId(key);
  244 + entitiesFutures.add(tsKvRepository.findAvg(
  245 + entityId.getId(),
  246 + keyId,
  247 + startTs,
  248 + endTs));
  249 + }
99 250
100 - protected SettableFuture<T> setFutures(List<CompletableFuture<T>> entitiesFutures) {  
101 - SettableFuture<T> listenableFuture = SettableFuture.create();  
102 - CompletableFuture<List<T>> entities = 251 + protected SettableFuture<TsKvEntity> setFutures(List<CompletableFuture<TsKvEntity>> entitiesFutures) {
  252 + SettableFuture<TsKvEntity> listenableFuture = SettableFuture.create();
  253 + CompletableFuture<List<TsKvEntity>> entities =
103 CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()])) 254 CompletableFuture.allOf(entitiesFutures.toArray(new CompletableFuture[entitiesFutures.size()]))
104 .thenApply(v -> entitiesFutures.stream() 255 .thenApply(v -> entitiesFutures.stream()
105 .map(CompletableFuture::join) 256 .map(CompletableFuture::join)
@@ -109,8 +260,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract @@ -109,8 +260,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao<T extends Abstract
109 if (throwable != null) { 260 if (throwable != null) {
110 listenableFuture.setException(throwable); 261 listenableFuture.setException(throwable);
111 } else { 262 } else {
112 - T result = null;  
113 - for (T entity : tsKvEntities) { 263 + TsKvEntity result = null;
  264 + for (TsKvEntity entity : tsKvEntities) {
114 if (entity.isNotEmpty()) { 265 if (entity.isNotEmpty()) {
115 result = entity; 266 result = entity;
116 break; 267 break;
@@ -43,6 +43,7 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
43 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; 43 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
44 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; 44 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
45 import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository; 45 import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
  46 +import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
46 import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; 47 import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
47 import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; 48 import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
48 import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; 49 import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
@@ -15,46 +15,25 @@ @@ -15,46 +15,25 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sqlts.hsql; 16 package org.thingsboard.server.dao.sqlts.hsql;
17 17
18 -import com.google.common.util.concurrent.Futures;  
19 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
20 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
21 -import org.springframework.beans.factory.annotation.Autowired;  
22 -import org.springframework.data.domain.PageRequest;  
23 -import org.springframework.data.domain.Sort;  
24 import org.springframework.stereotype.Component; 20 import org.springframework.stereotype.Component;
25 import org.thingsboard.server.common.data.id.EntityId; 21 import org.thingsboard.server.common.data.id.EntityId;
26 import org.thingsboard.server.common.data.id.TenantId; 22 import org.thingsboard.server.common.data.id.TenantId;
27 -import org.thingsboard.server.common.data.kv.Aggregation;  
28 -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;  
29 -import org.thingsboard.server.common.data.kv.ReadTsKvQuery;  
30 import org.thingsboard.server.common.data.kv.TsKvEntry; 23 import org.thingsboard.server.common.data.kv.TsKvEntry;
31 -import org.thingsboard.server.dao.DaoUtil;  
32 -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity; 24 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
33 import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; 25 import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
34 import org.thingsboard.server.dao.sqlts.EntityContainer; 26 import org.thingsboard.server.dao.sqlts.EntityContainer;
35 import org.thingsboard.server.dao.timeseries.TimeseriesDao; 27 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
36 import org.thingsboard.server.dao.util.HsqlDao; 28 import org.thingsboard.server.dao.util.HsqlDao;
37 import org.thingsboard.server.dao.util.SqlTsDao; 29 import org.thingsboard.server.dao.util.SqlTsDao;
38 30
39 -import java.util.ArrayList;  
40 -import java.util.List;  
41 -import java.util.Optional;  
42 -import java.util.concurrent.CompletableFuture;  
43 -  
44 31
45 @Component 32 @Component
46 @Slf4j 33 @Slf4j
47 @SqlTsDao 34 @SqlTsDao
48 @HsqlDao 35 @HsqlDao
49 -public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao<TsKvEntity> implements TimeseriesDao {  
50 -  
51 - @Autowired  
52 - private TsKvHsqlRepository tsKvRepository;  
53 -  
54 - @Override  
55 - public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {  
56 - return processFindAllAsync(tenantId, entityId, queries);  
57 - } 36 +public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao implements TimeseriesDao {
58 37
59 @Override 38 @Override
60 public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { 39 public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
@@ -72,154 +51,4 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -72,154 +51,4 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
72 return tsQueue.add(new EntityContainer(entity, null)); 51 return tsQueue.add(new EntityContainer(entity, null));
73 } 52 }
74 53
75 - @Override  
76 - public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
77 - return service.submit(() -> {  
78 - tsKvRepository.delete(  
79 - entityId.getId(),  
80 - getOrSaveKeyId(query.getKey()),  
81 - query.getStartTs(),  
82 - query.getEndTs());  
83 - return null;  
84 - });  
85 - }  
86 -  
87 - @Override  
88 - public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {  
89 - return getSaveLatestFuture(entityId, tsKvEntry);  
90 - }  
91 -  
92 - @Override  
93 - public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
94 - return getRemoveLatestFuture(tenantId, entityId, query);  
95 - }  
96 -  
97 - @Override  
98 - public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {  
99 - return getFindLatestFuture(entityId, key);  
100 - }  
101 -  
102 - @Override  
103 - public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {  
104 - return getFindAllLatestFuture(entityId);  
105 - }  
106 -  
107 - @Override  
108 - public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {  
109 - return Futures.immediateFuture(null);  
110 - }  
111 -  
112 - @Override  
113 - public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
114 - return Futures.immediateFuture(null);  
115 - }  
116 -  
117 - protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {  
118 - if (query.getAggregation() == Aggregation.NONE) {  
119 - return findAllAsyncWithLimit(tenantId, entityId, query);  
120 - } else {  
121 - long stepTs = query.getStartTs();  
122 - List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();  
123 - while (stepTs < query.getEndTs()) {  
124 - long startTs = stepTs;  
125 - long endTs = stepTs + query.getInterval();  
126 - long ts = startTs + (endTs - startTs) / 2;  
127 - futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));  
128 - stepTs = endTs;  
129 - }  
130 - return getTskvEntriesFuture(Futures.allAsList(futures));  
131 - }  
132 - }  
133 -  
134 - @Override  
135 - protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {  
136 - List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(  
137 - entityId.getId(),  
138 - getOrSaveKeyId(query.getKey()),  
139 - query.getStartTs(),  
140 - query.getEndTs(),  
141 - new PageRequest(0, query.getLimit(),  
142 - new Sort(Sort.Direction.fromString(  
143 - query.getOrderBy()), "ts")));  
144 - tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));  
145 - return Futures.immediateFuture(  
146 - DaoUtil.convertDataList(  
147 - tsKvEntities));  
148 - }  
149 -  
150 - @Override  
151 - protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {  
152 - List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();  
153 - switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);  
154 - return Futures.transform(setFutures(entitiesFutures), entity -> {  
155 - if (entity != null && entity.isNotEmpty()) {  
156 - entity.setEntityId(entityId.getId());  
157 - entity.setKey(getOrSaveKeyId(key));  
158 - entity.setTs(ts);  
159 - return Optional.of(DaoUtil.getData(entity));  
160 - } else {  
161 - return Optional.empty();  
162 - }  
163 - });  
164 - }  
165 -  
166 - @Override  
167 - protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
168 - Integer keyId = getOrSaveKeyId(key);  
169 - entitiesFutures.add(tsKvRepository.findCount(  
170 - entityId.getId(),  
171 - keyId,  
172 - startTs,  
173 - endTs));  
174 - }  
175 -  
176 - @Override  
177 - protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
178 - Integer keyId = getOrSaveKeyId(key);  
179 - entitiesFutures.add(tsKvRepository.findSum(  
180 - entityId.getId(),  
181 - keyId,  
182 - startTs,  
183 - endTs));  
184 - }  
185 -  
186 - @Override  
187 - protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
188 - Integer keyId = getOrSaveKeyId(key);  
189 - entitiesFutures.add(tsKvRepository.findStringMin(  
190 - entityId.getId(),  
191 - keyId,  
192 - startTs,  
193 - endTs));  
194 - entitiesFutures.add(tsKvRepository.findNumericMin(  
195 - entityId.getId(),  
196 - keyId,  
197 - startTs,  
198 - endTs));  
199 - }  
200 -  
201 - @Override  
202 - protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
203 - Integer keyId = getOrSaveKeyId(key);  
204 - entitiesFutures.add(tsKvRepository.findStringMax(  
205 - entityId.getId(),  
206 - keyId,  
207 - startTs,  
208 - endTs));  
209 - entitiesFutures.add(tsKvRepository.findNumericMax(  
210 - entityId.getId(),  
211 - keyId,  
212 - startTs,  
213 - endTs));  
214 - }  
215 -  
216 - @Override  
217 - protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
218 - Integer keyId = getOrSaveKeyId(key);  
219 - entitiesFutures.add(tsKvRepository.findAvg(  
220 - entityId.getId(),  
221 - keyId,  
222 - startTs,  
223 - endTs));  
224 - }  
225 -}  
  54 +}
1 -/**  
2 - * Copyright © 2016-2020 The Thingsboard Authors  
3 - *  
4 - * Licensed under the Apache License, Version 2.0 (the "License");  
5 - * you may not use this file except in compliance with the License.  
6 - * You may obtain a copy of the License at  
7 - *  
8 - * http://www.apache.org/licenses/LICENSE-2.0  
9 - *  
10 - * Unless required by applicable law or agreed to in writing, software  
11 - * distributed under the License is distributed on an "AS IS" BASIS,  
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
13 - * See the License for the specific language governing permissions and  
14 - * limitations under the License.  
15 - */  
16 -package org.thingsboard.server.dao.sqlts.hsql;  
17 -  
18 -import org.springframework.data.domain.Pageable;  
19 -import org.springframework.data.jpa.repository.Modifying;  
20 -import org.springframework.data.jpa.repository.Query;  
21 -import org.springframework.data.repository.CrudRepository;  
22 -import org.springframework.data.repository.query.Param;  
23 -import org.springframework.scheduling.annotation.Async;  
24 -import org.springframework.transaction.annotation.Transactional;  
25 -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvCompositeKey;  
26 -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;  
27 -import org.thingsboard.server.dao.util.SqlDao;  
28 -  
29 -import java.util.List;  
30 -import java.util.UUID;  
31 -import java.util.concurrent.CompletableFuture;  
32 -  
33 -@SqlDao  
34 -public interface TsKvHsqlRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {  
35 -  
36 - @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
37 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
38 - List<TsKvEntity> findAllWithLimit(@Param("entityId") UUID entityId,  
39 - @Param("entityKey") int key,  
40 - @Param("startTs") long startTs,  
41 - @Param("endTs") long endTs,  
42 - Pageable pageable);  
43 -  
44 - @Transactional  
45 - @Modifying  
46 - @Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
47 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
48 - void delete(@Param("entityId") UUID entityId,  
49 - @Param("entityKey") int key,  
50 - @Param("startTs") long startTs,  
51 - @Param("endTs") long endTs);  
52 -  
53 - @Async  
54 - @Query("SELECT new TsKvEntity(MAX(tskv.strValue)) FROM TsKvEntity tskv " +  
55 - "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId AND tskv.key = :entityKey" +  
56 - " AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
57 - CompletableFuture<TsKvEntity> findStringMax(@Param("entityId") UUID entityId,  
58 - @Param("entityKey") int entityKey,  
59 - @Param("startTs") long startTs,  
60 - @Param("endTs") long endTs);  
61 -  
62 - @Async  
63 - @Query("SELECT new TsKvEntity(MAX(COALESCE(tskv.longValue, -9223372036854775807)), " +  
64 - "MAX(COALESCE(tskv.doubleValue, -1.79769E+308)), " +  
65 - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +  
66 - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +  
67 - "'MAX') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
68 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
69 - CompletableFuture<TsKvEntity> findNumericMax(@Param("entityId") UUID entityId,  
70 - @Param("entityKey") int entityKey,  
71 - @Param("startTs") long startTs,  
72 - @Param("endTs") long endTs);  
73 -  
74 -  
75 - @Async  
76 - @Query("SELECT new TsKvEntity(MIN(tskv.strValue)) FROM TsKvEntity tskv " +  
77 - "WHERE tskv.strValue IS NOT NULL AND tskv.entityId = :entityId " +  
78 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
79 - CompletableFuture<TsKvEntity> findStringMin(@Param("entityId") UUID entityId,  
80 - @Param("entityKey") int entityKey,  
81 - @Param("startTs") long startTs,  
82 - @Param("endTs") long endTs);  
83 -  
84 - @Async  
85 - @Query("SELECT new TsKvEntity(MIN(COALESCE(tskv.longValue, 9223372036854775807)), " +  
86 - "MIN(COALESCE(tskv.doubleValue, 1.79769E+308)), " +  
87 - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +  
88 - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +  
89 - "'MIN') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
90 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
91 - CompletableFuture<TsKvEntity> findNumericMin(@Param("entityId") UUID entityId,  
92 - @Param("entityKey") int entityKey,  
93 - @Param("startTs") long startTs,  
94 - @Param("endTs") long endTs);  
95 -  
96 - @Async  
97 - @Query("SELECT new TsKvEntity(SUM(CASE WHEN tskv.booleanValue IS NULL THEN 0 ELSE 1 END), " +  
98 - "SUM(CASE WHEN tskv.strValue IS NULL THEN 0 ELSE 1 END), " +  
99 - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +  
100 - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +  
101 - "SUM(CASE WHEN tskv.jsonValue IS NULL THEN 0 ELSE 1 END)) FROM TsKvEntity tskv " +  
102 - "WHERE tskv.entityId = :entityId AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
103 - CompletableFuture<TsKvEntity> findCount(@Param("entityId") UUID entityId,  
104 - @Param("entityKey") int entityKey,  
105 - @Param("startTs") long startTs,  
106 - @Param("endTs") long endTs);  
107 -  
108 - @Async  
109 - @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +  
110 - "SUM(COALESCE(tskv.doubleValue, 0.0)), " +  
111 - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +  
112 - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +  
113 - "'AVG') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
114 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
115 - CompletableFuture<TsKvEntity> findAvg(@Param("entityId") UUID entityId,  
116 - @Param("entityKey") int entityKey,  
117 - @Param("startTs") long startTs,  
118 - @Param("endTs") long endTs);  
119 -  
120 - @Async  
121 - @Query("SELECT new TsKvEntity(SUM(COALESCE(tskv.longValue, 0)), " +  
122 - "SUM(COALESCE(tskv.doubleValue, 0.0)), " +  
123 - "SUM(CASE WHEN tskv.longValue IS NULL THEN 0 ELSE 1 END), " +  
124 - "SUM(CASE WHEN tskv.doubleValue IS NULL THEN 0 ELSE 1 END), " +  
125 - "'SUM') FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +  
126 - "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")  
127 - CompletableFuture<TsKvEntity> findSum(@Param("entityId") UUID entityId,  
128 - @Param("entityKey") int entityKey,  
129 - @Param("startTs") long startTs,  
130 - @Param("endTs") long endTs);  
131 -  
132 -}  
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/AbstractInsertRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts; 16 +package org.thingsboard.server.dao.sqlts.insert;
17 17
18 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.beans.factory.annotation.Value;
@@ -44,4 +44,4 @@ public abstract class AbstractInsertRepository { @@ -44,4 +44,4 @@ public abstract class AbstractInsertRepository {
44 } 44 }
45 return strValue; 45 return strValue;
46 } 46 }
47 -}  
  47 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/InsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertTsRepository.java
@@ -13,9 +13,10 @@ @@ -13,9 +13,10 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts; 16 +package org.thingsboard.server.dao.sqlts.insert;
17 17
18 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; 18 import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
  19 +import org.thingsboard.server.dao.sqlts.EntityContainer;
19 20
20 import java.util.List; 21 import java.util.List;
21 22
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/hsql/HsqlInsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/HsqlInsertTsRepository.java
@@ -13,15 +13,15 @@ @@ -13,15 +13,15 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.hsql; 16 +package org.thingsboard.server.dao.sqlts.insert.hsql;
17 17
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 18 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
20 import org.springframework.transaction.annotation.Transactional; 20 import org.springframework.transaction.annotation.Transactional;
21 -import org.thingsboard.server.dao.model.sqlts.hsql.TsKvEntity;  
22 -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; 21 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
23 import org.thingsboard.server.dao.sqlts.EntityContainer; 22 import org.thingsboard.server.dao.sqlts.EntityContainer;
24 -import org.thingsboard.server.dao.sqlts.InsertTsRepository; 23 +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
  24 +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
25 import org.thingsboard.server.dao.util.HsqlDao; 25 import org.thingsboard.server.dao.util.HsqlDao;
26 import org.thingsboard.server.dao.util.SqlTsDao; 26 import org.thingsboard.server.dao.util.SqlTsDao;
27 27
@@ -86,4 +86,4 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements @@ -86,4 +86,4 @@ public class HsqlInsertTsRepository extends AbstractInsertRepository implements
86 } 86 }
87 }); 87 });
88 } 88 }
89 -}  
  89 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/InsertLatestTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/InsertLatestTsRepository.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts; 16 +package org.thingsboard.server.dao.sqlts.insert.latest;
17 17
18 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; 18 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
19 19
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/hsql/HsqlLatestInsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/HsqlLatestInsertTsRepository.java
@@ -13,14 +13,14 @@ @@ -13,14 +13,14 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.latest; 16 +package org.thingsboard.server.dao.sqlts.insert.latest.hsql;
17 17
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 18 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
20 import org.springframework.transaction.annotation.Transactional; 20 import org.springframework.transaction.annotation.Transactional;
21 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; 21 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
22 -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;  
23 -import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository; 22 +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
  23 +import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
24 import org.thingsboard.server.dao.util.HsqlDao; 24 import org.thingsboard.server.dao.util.HsqlDao;
25 import org.thingsboard.server.dao.util.SqlTsDao; 25 import org.thingsboard.server.dao.util.SqlTsDao;
26 26
@@ -82,4 +82,4 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple @@ -82,4 +82,4 @@ public class HsqlLatestInsertTsRepository extends AbstractInsertRepository imple
82 } 82 }
83 }); 83 });
84 } 84 }
85 -}  
  85 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/psql/PsqlLatestInsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/PsqlLatestInsertTsRepository.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.latest; 16 +package org.thingsboard.server.dao.sqlts.insert.latest.psql;
17 17
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 18 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
@@ -21,8 +21,8 @@ import org.springframework.transaction.TransactionStatus; @@ -21,8 +21,8 @@ import org.springframework.transaction.TransactionStatus;
21 import org.springframework.transaction.annotation.Transactional; 21 import org.springframework.transaction.annotation.Transactional;
22 import org.springframework.transaction.support.TransactionCallbackWithoutResult; 22 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
23 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; 23 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
24 -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository;  
25 -import org.thingsboard.server.dao.sqlts.InsertLatestTsRepository; 24 +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
  25 +import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
26 import org.thingsboard.server.dao.util.PsqlTsAnyDao; 26 import org.thingsboard.server.dao.util.PsqlTsAnyDao;
27 27
28 import java.sql.PreparedStatement; 28 import java.sql.PreparedStatement;
@@ -151,4 +151,4 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple @@ -151,4 +151,4 @@ public class PsqlLatestInsertTsRepository extends AbstractInsertRepository imple
151 } 151 }
152 }); 152 });
153 } 153 }
154 -}  
  154 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlInsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlInsertTsRepository.java
@@ -13,15 +13,15 @@ @@ -13,15 +13,15 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.psql; 16 +package org.thingsboard.server.dao.sqlts.insert.psql;
17 17
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 18 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
20 import org.springframework.transaction.annotation.Transactional; 20 import org.springframework.transaction.annotation.Transactional;
21 -import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity;  
22 -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; 21 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
  22 +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
23 import org.thingsboard.server.dao.sqlts.EntityContainer; 23 import org.thingsboard.server.dao.sqlts.EntityContainer;
24 -import org.thingsboard.server.dao.sqlts.InsertTsRepository; 24 +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
25 import org.thingsboard.server.dao.util.PsqlDao; 25 import org.thingsboard.server.dao.util.PsqlDao;
26 import org.thingsboard.server.dao.util.SqlTsDao; 26 import org.thingsboard.server.dao.util.SqlTsDao;
27 27
@@ -101,4 +101,4 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements @@ -101,4 +101,4 @@ public class PsqlInsertTsRepository extends AbstractInsertRepository implements
101 private String getInsertOrUpdateQuery(String partitionDate) { 101 private String getInsertOrUpdateQuery(String partitionDate) {
102 return INSERT_INTO_TS_KV + partitionDate + VALUES_ON_CONFLICT_DO_UPDATE; 102 return INSERT_INTO_TS_KV + partitionDate + VALUES_ON_CONFLICT_DO_UPDATE;
103 } 103 }
104 -}  
  104 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/psql/PsqlPartitioningRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/PsqlPartitioningRepository.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.psql; 16 +package org.thingsboard.server.dao.sqlts.insert.psql;
17 17
18 import org.springframework.stereotype.Repository; 18 import org.springframework.stereotype.Repository;
19 import org.springframework.transaction.annotation.Transactional; 19 import org.springframework.transaction.annotation.Transactional;
@@ -38,4 +38,4 @@ public class PsqlPartitioningRepository { @@ -38,4 +38,4 @@ public class PsqlPartitioningRepository {
38 .executeUpdate(); 38 .executeUpdate();
39 } 39 }
40 40
41 -}  
  41 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertTsRepository.java
@@ -13,15 +13,15 @@ @@ -13,15 +13,15 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.timescale; 16 +package org.thingsboard.server.dao.sqlts.insert.timescale;
17 17
18 import org.springframework.jdbc.core.BatchPreparedStatementSetter; 18 import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
20 import org.springframework.transaction.annotation.Transactional; 20 import org.springframework.transaction.annotation.Transactional;
21 -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity;  
22 -import org.thingsboard.server.dao.sqlts.AbstractInsertRepository; 21 +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
  22 +import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
23 import org.thingsboard.server.dao.sqlts.EntityContainer; 23 import org.thingsboard.server.dao.sqlts.EntityContainer;
24 -import org.thingsboard.server.dao.sqlts.InsertTsRepository; 24 +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
25 import org.thingsboard.server.dao.util.PsqlDao; 25 import org.thingsboard.server.dao.util.PsqlDao;
26 import org.thingsboard.server.dao.util.TimescaleDBTsDao; 26 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
27 27
@@ -89,4 +89,4 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem @@ -89,4 +89,4 @@ public class TimescaleInsertTsRepository extends AbstractInsertRepository implem
89 } 89 }
90 }); 90 });
91 } 91 }
92 -}  
  92 +}
@@ -15,27 +15,20 @@ @@ -15,27 +15,20 @@
15 */ 15 */
16 package org.thingsboard.server.dao.sqlts.psql; 16 package org.thingsboard.server.dao.sqlts.psql;
17 17
18 -import com.google.common.util.concurrent.Futures;  
19 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
20 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
21 import org.springframework.beans.factory.annotation.Autowired; 20 import org.springframework.beans.factory.annotation.Autowired;
22 import org.springframework.beans.factory.annotation.Value; 21 import org.springframework.beans.factory.annotation.Value;
23 -import org.springframework.data.domain.PageRequest;  
24 -import org.springframework.data.domain.Sort;  
25 import org.springframework.stereotype.Component; 22 import org.springframework.stereotype.Component;
26 import org.thingsboard.server.common.data.id.EntityId; 23 import org.thingsboard.server.common.data.id.EntityId;
27 import org.thingsboard.server.common.data.id.TenantId; 24 import org.thingsboard.server.common.data.id.TenantId;
28 -import org.thingsboard.server.common.data.kv.Aggregation;  
29 -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;  
30 -import org.thingsboard.server.common.data.kv.ReadTsKvQuery;  
31 import org.thingsboard.server.common.data.kv.TsKvEntry; 25 import org.thingsboard.server.common.data.kv.TsKvEntry;
32 -import org.thingsboard.server.dao.DaoUtil;  
33 -import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity; 26 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
34 import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; 27 import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
35 import org.thingsboard.server.dao.sqlts.EntityContainer; 28 import org.thingsboard.server.dao.sqlts.EntityContainer;
  29 +import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository;
36 import org.thingsboard.server.dao.timeseries.PsqlPartition; 30 import org.thingsboard.server.dao.timeseries.PsqlPartition;
37 import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; 31 import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
38 -import org.thingsboard.server.dao.timeseries.TimeseriesDao;  
39 import org.thingsboard.server.dao.util.PsqlDao; 32 import org.thingsboard.server.dao.util.PsqlDao;
40 import org.thingsboard.server.dao.util.SqlTsDao; 33 import org.thingsboard.server.dao.util.SqlTsDao;
41 34
@@ -44,11 +37,8 @@ import java.time.LocalDateTime; @@ -44,11 +37,8 @@ import java.time.LocalDateTime;
44 import java.time.ZoneOffset; 37 import java.time.ZoneOffset;
45 import java.time.ZonedDateTime; 38 import java.time.ZonedDateTime;
46 import java.time.format.DateTimeFormatter; 39 import java.time.format.DateTimeFormatter;
47 -import java.util.ArrayList;  
48 -import java.util.List;  
49 import java.util.Map; 40 import java.util.Map;
50 import java.util.Optional; 41 import java.util.Optional;
51 -import java.util.concurrent.CompletableFuture;  
52 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
53 import java.util.concurrent.locks.ReentrantLock; 43 import java.util.concurrent.locks.ReentrantLock;
54 44
@@ -59,15 +49,12 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA @@ -59,15 +49,12 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA
59 @Slf4j 49 @Slf4j
60 @SqlTsDao 50 @SqlTsDao
61 @PsqlDao 51 @PsqlDao
62 -public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao<TsKvEntity> implements TimeseriesDao { 52 +public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDao {
63 53
64 private final Map<Long, PsqlPartition> partitions = new ConcurrentHashMap<>(); 54 private final Map<Long, PsqlPartition> partitions = new ConcurrentHashMap<>();
65 private static final ReentrantLock partitionCreationLock = new ReentrantLock(); 55 private static final ReentrantLock partitionCreationLock = new ReentrantLock();
66 56
67 @Autowired 57 @Autowired
68 - private TsKvPsqlRepository tsKvRepository;  
69 -  
70 - @Autowired  
71 private PsqlPartitioningRepository partitioningRepository; 58 private PsqlPartitioningRepository partitioningRepository;
72 59
73 private SqlTsPartitionDate tsFormat; 60 private SqlTsPartitionDate tsFormat;
@@ -110,163 +97,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -110,163 +97,6 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
110 return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); 97 return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate()));
111 } 98 }
112 99
113 - @Override  
114 - public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
115 - return service.submit(() -> {  
116 - String strKey = query.getKey();  
117 - Integer keyId = getOrSaveKeyId(strKey);  
118 - tsKvRepository.delete(  
119 - entityId.getId(),  
120 - keyId,  
121 - query.getStartTs(),  
122 - query.getEndTs());  
123 - return null;  
124 - });  
125 - }  
126 -  
127 - @Override  
128 - public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
129 - return getRemoveLatestFuture(tenantId, entityId, query);  
130 - }  
131 -  
132 - @Override  
133 - public ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {  
134 - return getSaveLatestFuture(entityId, tsKvEntry);  
135 - }  
136 -  
137 - @Override  
138 - public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {  
139 - return getFindLatestFuture(entityId, key);  
140 - }  
141 -  
142 - @Override  
143 - public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {  
144 - return getFindAllLatestFuture(entityId);  
145 - }  
146 -  
147 - @Override  
148 - public ListenableFuture<Void> savePartition(TenantId tenantId, EntityId entityId, long tsKvEntryTs, String key, long ttl) {  
149 - return Futures.immediateFuture(null);  
150 - }  
151 -  
152 - @Override  
153 - public ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {  
154 - return Futures.immediateFuture(null);  
155 - }  
156 -  
157 - protected ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {  
158 - if (query.getAggregation() == Aggregation.NONE) {  
159 - return findAllAsyncWithLimit(tenantId, entityId, query);  
160 - } else {  
161 - long stepTs = query.getStartTs();  
162 - List<ListenableFuture<Optional<TsKvEntry>>> futures = new ArrayList<>();  
163 - while (stepTs < query.getEndTs()) {  
164 - long startTs = stepTs;  
165 - long endTs = stepTs + query.getInterval();  
166 - long ts = startTs + (endTs - startTs) / 2;  
167 - futures.add(findAndAggregateAsync(tenantId, entityId, query.getKey(), startTs, endTs, ts, query.getAggregation()));  
168 - stepTs = endTs;  
169 - }  
170 - return getTskvEntriesFuture(Futures.allAsList(futures));  
171 - }  
172 - }  
173 -  
174 - @Override  
175 - protected ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {  
176 - Integer keyId = getOrSaveKeyId(query.getKey());  
177 - List<TsKvEntity> tsKvEntities = tsKvRepository.findAllWithLimit(  
178 - entityId.getId(),  
179 - keyId,  
180 - query.getStartTs(),  
181 - query.getEndTs(),  
182 - new PageRequest(0, query.getLimit(),  
183 - new Sort(Sort.Direction.fromString(  
184 - query.getOrderBy()), "ts")));  
185 - tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));  
186 - return Futures.immediateFuture(DaoUtil.convertDataList(tsKvEntities));  
187 - }  
188 -  
189 - @Override  
190 - protected ListenableFuture<Optional<TsKvEntry>> findAndAggregateAsync(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {  
191 - List<CompletableFuture<TsKvEntity>> entitiesFutures = new ArrayList<>();  
192 - switchAggregation(tenantId, entityId, key, startTs, endTs, aggregation, entitiesFutures);  
193 - return Futures.transform(setFutures(entitiesFutures), entity -> {  
194 - if (entity != null && entity.isNotEmpty()) {  
195 - entity.setEntityId(entityId.getId());  
196 - entity.setStrKey(key);  
197 - entity.setTs(ts);  
198 - return Optional.of(DaoUtil.getData(entity));  
199 - } else {  
200 - return Optional.empty();  
201 - }  
202 - });  
203 - }  
204 -  
205 - @Override  
206 - protected void findCount(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
207 - Integer keyId = getOrSaveKeyId(key);  
208 - entitiesFutures.add(tsKvRepository.findCount(  
209 - entityId.getId(),  
210 - keyId,  
211 - startTs,  
212 - endTs));  
213 - }  
214 -  
215 - @Override  
216 - protected void findSum(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
217 - Integer keyId = getOrSaveKeyId(key);  
218 - entitiesFutures.add(tsKvRepository.findSum(  
219 - entityId.getId(),  
220 - keyId,  
221 - startTs,  
222 - endTs));  
223 - }  
224 -  
225 - @Override  
226 - protected void findMin(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
227 - Integer keyId = getOrSaveKeyId(key);  
228 - entitiesFutures.add(tsKvRepository.findStringMin(  
229 - entityId.getId(),  
230 - keyId,  
231 - startTs,  
232 - endTs));  
233 - entitiesFutures.add(tsKvRepository.findNumericMin(  
234 - entityId.getId(),  
235 - keyId,  
236 - startTs,  
237 - endTs));  
238 - }  
239 -  
240 - @Override  
241 - protected void findMax(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
242 - Integer keyId = getOrSaveKeyId(key);  
243 - entitiesFutures.add(tsKvRepository.findStringMax(  
244 - entityId.getId(),  
245 - keyId,  
246 - startTs,  
247 - endTs));  
248 - entitiesFutures.add(tsKvRepository.findNumericMax(  
249 - entityId.getId(),  
250 - keyId,  
251 - startTs,  
252 - endTs));  
253 - }  
254 -  
255 - @Override  
256 - protected void findAvg(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, List<CompletableFuture<TsKvEntity>> entitiesFutures) {  
257 - Integer keyId = getOrSaveKeyId(key);  
258 - entitiesFutures.add(tsKvRepository.findAvg(  
259 - entityId.getId(),  
260 - keyId,  
261 - startTs,  
262 - endTs));  
263 - }  
264 -  
265 - @Override  
266 - public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {  
267 - return processFindAllAsync(tenantId, entityId, queries);  
268 - }  
269 -  
270 private void savePartition(PsqlPartition psqlPartition) { 100 private void savePartition(PsqlPartition psqlPartition) {
271 if (!partitions.containsKey(psqlPartition.getStart())) { 101 if (!partitions.containsKey(psqlPartition.getStart())) {
272 partitionCreationLock.lock(); 102 partitionCreationLock.lock();
@@ -306,4 +136,4 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @@ -306,4 +136,4 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
306 private static long toMills(LocalDateTime time) { 136 private static long toMills(LocalDateTime time) {
307 return time.toInstant(ZoneOffset.UTC).toEpochMilli(); 137 return time.toInstant(ZoneOffset.UTC).toEpochMilli();
308 } 138 }
309 -}  
  139 +}
@@ -17,7 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale; @@ -17,7 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale;
17 17
18 import org.springframework.scheduling.annotation.Async; 18 import org.springframework.scheduling.annotation.Async;
19 import org.springframework.stereotype.Repository; 19 import org.springframework.stereotype.Repository;
20 -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; 20 +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
21 import org.thingsboard.server.dao.util.TimescaleDBTsDao; 21 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
22 22
23 import javax.persistence.EntityManager; 23 import javax.persistence.EntityManager;
@@ -98,4 +98,4 @@ public class AggregationRepository { @@ -98,4 +98,4 @@ public class AggregationRepository {
98 } 98 }
99 99
100 100
101 -}  
  101 +}
@@ -31,12 +31,12 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; @@ -31,12 +31,12 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
31 import org.thingsboard.server.common.data.kv.ReadTsKvQuery; 31 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
32 import org.thingsboard.server.common.data.kv.TsKvEntry; 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
33 import org.thingsboard.server.dao.DaoUtil; 33 import org.thingsboard.server.dao.DaoUtil;
34 -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; 34 +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
36 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; 36 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
37 import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; 37 import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
38 import org.thingsboard.server.dao.sqlts.EntityContainer; 38 import org.thingsboard.server.dao.sqlts.EntityContainer;
39 -import org.thingsboard.server.dao.sqlts.InsertTsRepository; 39 +import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
40 import org.thingsboard.server.dao.timeseries.TimeseriesDao; 40 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
41 import org.thingsboard.server.dao.util.TimescaleDBTsDao; 41 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
42 42
@@ -286,4 +286,4 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements @@ -286,4 +286,4 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
286 startTs, 286 startTs,
287 endTs); 287 endTs);
288 } 288 }
289 -}  
  289 +}
@@ -21,8 +21,8 @@ import org.springframework.data.jpa.repository.Query; @@ -21,8 +21,8 @@ import org.springframework.data.jpa.repository.Query;
21 import org.springframework.data.repository.CrudRepository; 21 import org.springframework.data.repository.CrudRepository;
22 import org.springframework.data.repository.query.Param; 22 import org.springframework.data.repository.query.Param;
23 import org.springframework.transaction.annotation.Transactional; 23 import org.springframework.transaction.annotation.Transactional;
24 -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvCompositeKey;  
25 -import org.thingsboard.server.dao.model.sqlts.timescale.TimescaleTsKvEntity; 24 +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvCompositeKey;
  25 +import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
26 import org.thingsboard.server.dao.util.TimescaleDBTsDao; 26 import org.thingsboard.server.dao.util.TimescaleDBTsDao;
27 27
28 import java.util.List; 28 import java.util.List;
@@ -54,4 +54,4 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt @@ -54,4 +54,4 @@ public interface TsKvTimescaleRepository extends CrudRepository<TimescaleTsKvEnt
54 @Param("startTs") long startTs, 54 @Param("startTs") long startTs,
55 @Param("endTs") long endTs); 55 @Param("endTs") long endTs);
56 56
57 -}  
  57 +}
dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/TsKvRepository.java renamed from dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/TsKvPsqlRepository.java
@@ -13,7 +13,7 @@ @@ -13,7 +13,7 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 -package org.thingsboard.server.dao.sqlts.psql; 16 +package org.thingsboard.server.dao.sqlts.ts;
17 17
18 import org.springframework.data.domain.Pageable; 18 import org.springframework.data.domain.Pageable;
19 import org.springframework.data.jpa.repository.Modifying; 19 import org.springframework.data.jpa.repository.Modifying;
@@ -22,8 +22,8 @@ import org.springframework.data.repository.CrudRepository; @@ -22,8 +22,8 @@ import org.springframework.data.repository.CrudRepository;
22 import org.springframework.data.repository.query.Param; 22 import org.springframework.data.repository.query.Param;
23 import org.springframework.scheduling.annotation.Async; 23 import org.springframework.scheduling.annotation.Async;
24 import org.springframework.transaction.annotation.Transactional; 24 import org.springframework.transaction.annotation.Transactional;
25 -import org.thingsboard.server.dao.model.sqlts.psql.TsKvCompositeKey;  
26 -import org.thingsboard.server.dao.model.sqlts.psql.TsKvEntity; 25 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvCompositeKey;
  26 +import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
27 import org.thingsboard.server.dao.util.SqlDao; 27 import org.thingsboard.server.dao.util.SqlDao;
28 28
29 import java.util.List; 29 import java.util.List;
@@ -31,7 +31,7 @@ import java.util.UUID; @@ -31,7 +31,7 @@ import java.util.UUID;
31 import java.util.concurrent.CompletableFuture; 31 import java.util.concurrent.CompletableFuture;
32 32
33 @SqlDao 33 @SqlDao
34 -public interface TsKvPsqlRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> { 34 +public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
35 35
36 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " + 36 @Query("SELECT tskv FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
37 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs") 37 "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts <= :endTs")
@@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
22 import org.thingsboard.server.common.data.kv.BasicTsKvEntry; 22 import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
23 import org.thingsboard.server.common.data.kv.BooleanDataEntry; 23 import org.thingsboard.server.common.data.kv.BooleanDataEntry;
24 import org.thingsboard.server.common.data.kv.DoubleDataEntry; 24 import org.thingsboard.server.common.data.kv.DoubleDataEntry;
  25 +import org.thingsboard.server.common.data.kv.JsonDataEntry;
25 import org.thingsboard.server.common.data.kv.KvEntry; 26 import org.thingsboard.server.common.data.kv.KvEntry;
26 import org.thingsboard.server.common.data.kv.LongDataEntry; 27 import org.thingsboard.server.common.data.kv.LongDataEntry;
27 import org.thingsboard.server.common.data.kv.StringDataEntry; 28 import org.thingsboard.server.common.data.kv.StringDataEntry;
@@ -73,15 +74,28 @@ public class RestJsonConverter { @@ -73,15 +74,28 @@ public class RestJsonConverter {
73 if (!value.isObject()) { 74 if (!value.isObject()) {
74 if (value.isBoolean()) { 75 if (value.isBoolean()) {
75 return new BooleanDataEntry(key, value.asBoolean()); 76 return new BooleanDataEntry(key, value.asBoolean());
76 - } else if (value.isDouble()) {  
77 - return new DoubleDataEntry(key, value.asDouble());  
78 - } else if (value.isLong()) {  
79 - return new LongDataEntry(key, value.asLong());  
80 - } else { 77 + } else if (value.isNumber()) {
  78 + return parseNumericValue(key, value);
  79 + } else if (value.isTextual()) {
81 return new StringDataEntry(key, value.asText()); 80 return new StringDataEntry(key, value.asText());
  81 + } else {
  82 + throw new RuntimeException(CAN_T_PARSE_VALUE + value);
82 } 83 }
83 } else { 84 } else {
84 - throw new RuntimeException(CAN_T_PARSE_VALUE + value); 85 + return new JsonDataEntry(key, value.toString());
  86 + }
  87 + }
  88 +
  89 + private static KvEntry parseNumericValue(String key, JsonNode value) {
  90 + if (value.isFloatingPointNumber()) {
  91 + return new DoubleDataEntry(key, value.asDouble());
  92 + } else {
  93 + try {
  94 + long longValue = Long.parseLong(value.toString());
  95 + return new LongDataEntry(key, longValue);
  96 + } catch (NumberFormatException e) {
  97 + throw new IllegalArgumentException("Big integer values are not supported!");
  98 + }
85 } 99 }
86 } 100 }
87 } 101 }