Showing
1 changed file
with
37 additions
and
25 deletions
@@ -48,10 +48,7 @@ import java.time.LocalDateTime; | @@ -48,10 +48,7 @@ import java.time.LocalDateTime; | ||
48 | import java.time.ZoneOffset; | 48 | import java.time.ZoneOffset; |
49 | import java.time.ZonedDateTime; | 49 | import java.time.ZonedDateTime; |
50 | import java.time.format.DateTimeFormatter; | 50 | import java.time.format.DateTimeFormatter; |
51 | -import java.util.ArrayList; | ||
52 | -import java.util.List; | ||
53 | -import java.util.Optional; | ||
54 | -import java.util.Set; | 51 | +import java.util.*; |
55 | import java.util.concurrent.CompletableFuture; | 52 | import java.util.concurrent.CompletableFuture; |
56 | import java.util.concurrent.ConcurrentHashMap; | 53 | import java.util.concurrent.ConcurrentHashMap; |
57 | import java.util.concurrent.ConcurrentMap; | 54 | import java.util.concurrent.ConcurrentMap; |
@@ -67,7 +64,7 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA | @@ -67,7 +64,7 @@ import static org.thingsboard.server.dao.timeseries.SqlTsPartitionDate.EPOCH_STA | ||
67 | public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEntity> implements TimeseriesDao { | 64 | public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEntity> implements TimeseriesDao { |
68 | 65 | ||
69 | private final ConcurrentMap<String, Integer> tsKvDictionaryMap = new ConcurrentHashMap<>(); | 66 | private final ConcurrentMap<String, Integer> tsKvDictionaryMap = new ConcurrentHashMap<>(); |
70 | - private final Set<PsqlPartition> partitions = ConcurrentHashMap.newKeySet(); | 67 | + private final Map<Long, PsqlPartition> partitions = new ConcurrentHashMap<>(); |
71 | 68 | ||
72 | private static final ReentrantLock tsCreationLock = new ReentrantLock(); | 69 | private static final ReentrantLock tsCreationLock = new ReentrantLock(); |
73 | private static final ReentrantLock partitionCreationLock = new ReentrantLock(); | 70 | private static final ReentrantLock partitionCreationLock = new ReentrantLock(); |
@@ -82,6 +79,7 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | @@ -82,6 +79,7 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | ||
82 | private PsqlPartitioningRepository partitioningRepository; | 79 | private PsqlPartitioningRepository partitioningRepository; |
83 | 80 | ||
84 | private SqlTsPartitionDate tsFormat; | 81 | private SqlTsPartitionDate tsFormat; |
82 | + private PsqlPartition indefinitePartition; | ||
85 | 83 | ||
86 | @Value("${sql.ts_key_value_partitioning}") | 84 | @Value("${sql.ts_key_value_partitioning}") |
87 | private String partitioning; | 85 | private String partitioning; |
@@ -92,6 +90,10 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | @@ -92,6 +90,10 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | ||
92 | Optional<SqlTsPartitionDate> partition = SqlTsPartitionDate.parse(partitioning); | 90 | Optional<SqlTsPartitionDate> partition = SqlTsPartitionDate.parse(partitioning); |
93 | if (partition.isPresent()) { | 91 | if (partition.isPresent()) { |
94 | tsFormat = partition.get(); | 92 | tsFormat = partition.get(); |
93 | + if (tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) { | ||
94 | + indefinitePartition = new PsqlPartition(toMills(EPOCH_START), Long.MAX_VALUE, tsFormat.getPattern()); | ||
95 | + savePartition(indefinitePartition); | ||
96 | + } | ||
95 | } else { | 97 | } else { |
96 | log.warn("Incorrect configuration of partitioning {}", partitioning); | 98 | log.warn("Incorrect configuration of partitioning {}", partitioning); |
97 | throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); | 99 | throw new RuntimeException("Failed to parse partitioning property: " + partitioning + "!"); |
@@ -116,23 +118,22 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | @@ -116,23 +118,22 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | ||
116 | entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); | 118 | entity.setLongValue(tsKvEntry.getLongValue().orElse(null)); |
117 | entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); | 119 | entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); |
118 | PsqlPartition psqlPartition = toPartition(tsKvEntry.getTs()); | 120 | PsqlPartition psqlPartition = toPartition(tsKvEntry.getTs()); |
119 | - savePartition(psqlPartition); | ||
120 | log.trace("Saving entity: {}", entity); | 121 | log.trace("Saving entity: {}", entity); |
121 | return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); | 122 | return tsQueue.add(new EntityContainer(entity, psqlPartition.getPartitionDate())); |
122 | } | 123 | } |
123 | 124 | ||
124 | @Override | 125 | @Override |
125 | public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { | 126 | public ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { |
126 | - return service.submit(() -> { | ||
127 | - String strKey = query.getKey(); | ||
128 | - Integer keyId = getOrSaveKeyId(strKey); | ||
129 | - tsKvRepository.delete( | ||
130 | - entityId.getId(), | ||
131 | - keyId, | ||
132 | - query.getStartTs(), | ||
133 | - query.getEndTs()); | ||
134 | - return null; | ||
135 | - }); | 127 | + return service.submit(() -> { |
128 | + String strKey = query.getKey(); | ||
129 | + Integer keyId = getOrSaveKeyId(strKey); | ||
130 | + tsKvRepository.delete( | ||
131 | + entityId.getId(), | ||
132 | + keyId, | ||
133 | + query.getStartTs(), | ||
134 | + query.getEndTs()); | ||
135 | + return null; | ||
136 | + }); | ||
136 | } | 137 | } |
137 | 138 | ||
138 | @Override | 139 | @Override |
@@ -286,13 +287,13 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | @@ -286,13 +287,13 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | ||
286 | } | 287 | } |
287 | 288 | ||
288 | private void savePartition(PsqlPartition psqlPartition) { | 289 | private void savePartition(PsqlPartition psqlPartition) { |
289 | - if (!partitions.contains(psqlPartition)) { | 290 | + if (!partitions.containsKey(psqlPartition.getStart())) { |
290 | partitionCreationLock.lock(); | 291 | partitionCreationLock.lock(); |
291 | try { | 292 | try { |
292 | log.trace("Saving partition: {}", psqlPartition); | 293 | log.trace("Saving partition: {}", psqlPartition); |
293 | partitioningRepository.save(psqlPartition); | 294 | partitioningRepository.save(psqlPartition); |
294 | log.trace("Adding partition to Set: {}", psqlPartition); | 295 | log.trace("Adding partition to Set: {}", psqlPartition); |
295 | - partitions.add(psqlPartition); | 296 | + partitions.put(psqlPartition.getStart(), psqlPartition); |
296 | } finally { | 297 | } finally { |
297 | partitionCreationLock.unlock(); | 298 | partitionCreationLock.unlock(); |
298 | } | 299 | } |
@@ -300,17 +301,28 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | @@ -300,17 +301,28 @@ public class JpaPsqlTimeseriesDao extends AbstractSimpleSqlTimeseriesDao<TsKvEnt | ||
300 | } | 301 | } |
301 | 302 | ||
302 | private PsqlPartition toPartition(long ts) { | 303 | private PsqlPartition toPartition(long ts) { |
303 | - if (tsFormat.getTruncateUnit().equals(SqlTsPartitionDate.INDEFINITE.getTruncateUnit())) { | ||
304 | - return new PsqlPartition(toMills(EPOCH_START), Long.MAX_VALUE, tsFormat.getPattern()); | 304 | + if (tsFormat.equals(SqlTsPartitionDate.INDEFINITE)) { |
305 | + return indefinitePartition; | ||
305 | } else { | 306 | } else { |
306 | LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); | 307 | LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); |
307 | LocalDateTime localDateTimeStart = tsFormat.trancateTo(time); | 308 | LocalDateTime localDateTimeStart = tsFormat.trancateTo(time); |
308 | - LocalDateTime localDateTimeEnd = tsFormat.plusTo(localDateTimeStart); | ||
309 | - ZonedDateTime zonedDateTime = localDateTimeStart.atZone(ZoneOffset.UTC); | ||
310 | - String partitionDate = zonedDateTime.format(DateTimeFormatter.ofPattern(tsFormat.getPattern())); | ||
311 | - return new PsqlPartition(toMills(localDateTimeStart), toMills(localDateTimeEnd), partitionDate); | 309 | + long partitionStartTs = toMills(localDateTimeStart); |
310 | + PsqlPartition partition = partitions.get(partitionStartTs); | ||
311 | + if (partition != null) { | ||
312 | + return partition; | ||
313 | + } else { | ||
314 | + LocalDateTime localDateTimeEnd = tsFormat.plusTo(localDateTimeStart); | ||
315 | + long partitionEndTs = toMills(localDateTimeEnd); | ||
316 | + ZonedDateTime zonedDateTime = localDateTimeStart.atZone(ZoneOffset.UTC); | ||
317 | + String partitionDate = zonedDateTime.format(DateTimeFormatter.ofPattern(tsFormat.getPattern())); | ||
318 | + partition = new PsqlPartition(partitionStartTs, partitionEndTs, partitionDate); | ||
319 | + savePartition(partition); | ||
320 | + return partition; | ||
321 | + } | ||
312 | } | 322 | } |
313 | } | 323 | } |
314 | 324 | ||
315 | - private long toMills(LocalDateTime time) { return time.toInstant(ZoneOffset.UTC).toEpochMilli(); } | 325 | + private static long toMills(LocalDateTime time) { |
326 | + return time.toInstant(ZoneOffset.UTC).toEpochMilli(); | ||
327 | + } | ||
316 | } | 328 | } |