Commit e77127019f058a00f7c5608b565c8d7ca12d2ae7

Authored by YevhenBondarenko
1 parent 71bd6001

Ts batch improvements

... ... @@ -252,14 +252,17 @@ sql:
252 252 batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
253 253 batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
254 254 stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}"
  255 + batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}"
255 256 ts:
256 257 batch_size: "${SQL_TS_BATCH_SIZE:10000}"
257 258 batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}"
258 259 stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}"
  260 + batch_threads: "${SQL_TS_BATCH_THREADS:4}"
259 261 ts_latest:
260 262 batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
261 263 batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
262 264 stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
  265 + batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}"
263 266 # Specify whether to remove null characters from strValue of attributes and timeseries before insert
264 267 remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
265 268 postgres:
... ... @@ -268,6 +271,7 @@ sql:
268 271 timescale:
269 272 # Specify Interval size for new data chunks storage.
270 273 chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
  274 + batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}"
271 275 ttl:
272 276 ts:
273 277 enabled: "${SQL_TTL_TS_ENABLED:true}"
... ...
... ... @@ -41,16 +41,14 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
41 41 private final TbSqlBlockingQueueParams params;
42 42
43 43 private ExecutorService executor;
44   - private ScheduledLogExecutorComponent logExecutor;
45 44
46 45 public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
47 46 this.params = params;
48 47 }
49 48
50 49 @Override
51   - public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
52   - this.logExecutor = logExecutor;
53   - executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + params.getLogName().toLowerCase()));
  50 + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int index) {
  51 + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase()));
54 52 executor.submit(() -> {
55 53 String logName = params.getLogName();
56 54 int batchSize = params.getBatchSize();
... ... @@ -94,7 +92,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
94 92
95 93 logExecutor.scheduleAtFixedRate(() -> {
96 94 if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) {
97   - log.info("[{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]",
  95 + log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
98 96 params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
99 97 }
100 98 }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import lombok.Data;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +
  22 +import java.util.List;
  23 +import java.util.concurrent.CopyOnWriteArrayList;
  24 +import java.util.function.Consumer;
  25 +import java.util.function.Function;
  26 +
  27 +@Slf4j
  28 +@Data
  29 +public class TbSqlBlockingQueueWrapper<E> {
  30 + private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
  31 + private final TbSqlBlockingQueueParams params;
  32 + private ScheduledLogExecutorComponent logExecutor;
  33 + private final Function<E, Integer> hashCodeFunction;
  34 + private final int maxThreads;
  35 +
  36 + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
  37 + for (int i = 0; i < maxThreads; i++) {
  38 + TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params);
  39 + queues.add(queue);
  40 + queue.init(logExecutor, saveFunction, i);
  41 + }
  42 + }
  43 +
  44 + public ListenableFuture<Void> add(E element) {
  45 + int hash = hashCodeFunction.apply(element);
  46 + int queueIndex = (hash & 0x7FFFFFFF) % maxThreads;
  47 + return queues.get(queueIndex).add(element);
  48 + }
  49 +
  50 + public void destroy() {
  51 + queues.forEach(TbSqlBlockingQueue::destroy);
  52 + }
  53 +}
... ...
... ... @@ -22,7 +22,7 @@ import java.util.function.Consumer;
22 22
23 23 public interface TbSqlQueue<E> {
24 24
25   - void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction);
  25 + void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int queueIndex);
26 26
27 27 void destroy();
28 28
... ...
... ... @@ -32,8 +32,8 @@ import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
32 32 import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
33 33 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
34 34 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
35   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
36 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
37 37 import org.thingsboard.server.dao.util.SqlDao;
38 38
39 39 import javax.annotation.PostConstruct;
... ... @@ -41,6 +41,7 @@ import javax.annotation.PreDestroy;
41 41 import java.util.Collection;
42 42 import java.util.List;
43 43 import java.util.Optional;
  44 +import java.util.function.Function;
44 45 import java.util.stream.Collectors;
45 46
46 47 import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
... ... @@ -68,7 +69,10 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
68 69 @Value("${sql.attributes.stats_print_interval_ms:1000}")
69 70 private long statsPrintIntervalMs;
70 71
71   - private TbSqlBlockingQueue<AttributeKvEntity> queue;
  72 + @Value("${sql.attributes.batch_threads:4}")
  73 + private int batchThreads;
  74 +
  75 + private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue;
72 76
73 77 @PostConstruct
74 78 private void init() {
... ... @@ -78,7 +82,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
78 82 .maxDelay(maxDelay)
79 83 .statsPrintIntervalMs(statsPrintIntervalMs)
80 84 .build();
81   - queue = new TbSqlBlockingQueue<>(params);
  85 +
  86 + Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getId().getEntityId().hashCode() : 0;
  87 + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads);
82 88 queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v));
83 89 }
84 90
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts;
17 17
18 18 import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.ListeningExecutorService;
20 21 import com.google.common.util.concurrent.MoreExecutors;
21 22 import com.google.common.util.concurrent.SettableFuture;
22 23 import lombok.extern.slf4j.Slf4j;
... ... @@ -31,8 +32,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
31 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
32 33 import org.thingsboard.server.dao.DaoUtil;
33 34 import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
34   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
35 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
36 37 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
37 38 import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
38 39 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
... ... @@ -43,6 +44,7 @@ import java.util.ArrayList;
43 44 import java.util.List;
44 45 import java.util.Optional;
45 46 import java.util.concurrent.CompletableFuture;
  47 +import java.util.function.Function;
46 48 import java.util.stream.Collectors;
47 49
48 50 @Slf4j
... ... @@ -54,7 +56,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
54 56 @Autowired
55 57 protected InsertTsRepository<TsKvEntity> insertRepository;
56 58
57   - protected TbSqlBlockingQueue<TsKvEntity> tsQueue;
  59 + protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
58 60
59 61 @PostConstruct
60 62 protected void init() {
... ... @@ -65,7 +67,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
65 67 .maxDelay(tsMaxDelay)
66 68 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
67 69 .build();
68   - tsQueue = new TbSqlBlockingQueue<>(tsParams);
  70 +
  71 + Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0;
  72 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads);
69 73 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
70 74 }
71 75
... ...
... ... @@ -41,8 +41,8 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey;
41 41 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
42 42 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
43 43 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
44   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
45 44 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  45 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
46 46 import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
47 47 import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
48 48 import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
... ... @@ -52,7 +52,9 @@ import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
52 52 import javax.annotation.Nullable;
53 53 import javax.annotation.PostConstruct;
54 54 import javax.annotation.PreDestroy;
  55 +import java.util.Comparator;
55 56 import java.util.List;
  57 +import java.util.Map;
56 58 import java.util.Objects;
57 59 import java.util.Optional;
58 60 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -82,7 +84,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
82 84 @Autowired
83 85 private TsKvDictionaryRepository dictionaryRepository;
84 86
85   - private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue;
  87 + private TbSqlBlockingQueueWrapper<TsKvLatestEntity> tsLatestQueue;
86 88
87 89 @Value("${sql.ts_latest.batch_size:1000}")
88 90 private int tsLatestBatchSize;
... ... @@ -93,6 +95,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
93 95 @Value("${sql.ts_latest.stats_print_interval_ms:1000}")
94 96 private long tsLatestStatsPrintIntervalMs;
95 97
  98 + @Value("${sql.ts_latest.batch_threads:4}")
  99 + private int tsLatestBatchThreads;
  100 +
96 101 @Autowired
97 102 protected ScheduledLogExecutorComponent logExecutor;
98 103
... ... @@ -105,6 +110,12 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
105 110 @Value("${sql.ts.stats_print_interval_ms:1000}")
106 111 protected long tsStatsPrintIntervalMs;
107 112
  113 + @Value("${sql.ts.batch_threads:4}")
  114 + protected int tsBatchThreads;
  115 +
  116 + @Value("${sql.timescale.batch_threads:4}")
  117 + protected int timescaleBatchThreads;
  118 +
108 119 @PostConstruct
109 120 protected void init() {
110 121 TbSqlBlockingQueueParams tsLatestParams = TbSqlBlockingQueueParams.builder()
... ... @@ -113,8 +124,23 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
113 124 .maxDelay(tsLatestMaxDelay)
114 125 .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
115 126 .build();
116   - tsLatestQueue = new TbSqlBlockingQueue<>(tsLatestParams);
117   - tsLatestQueue.init(logExecutor, v -> insertLatestTsRepository.saveOrUpdate(v));
  127 +
  128 + java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0;
  129 + tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads);
  130 +
  131 + tsLatestQueue.init(logExecutor, v -> {
  132 + Map<TsKey, List<TsKvLatestEntity>> tsMap =
  133 + v.stream().collect(Collectors.groupingBy(ts -> new TsKey(ts.getEntityId(), ts.getStrKey())));
  134 +
  135 + List<TsKvLatestEntity> latestEntities =
  136 + tsMap.keySet()
  137 + .stream()
  138 + .map(tsMap::get)
  139 + .map(list -> list.stream().max(Comparator.comparing(TsKvLatestEntity::getTs)).get())
  140 + .collect(Collectors.toList());
  141 +
  142 + insertLatestTsRepository.saveOrUpdate(latestEntities);
  143 + });
118 144 }
119 145
120 146 @PreDestroy
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sqlts;
  17 +
  18 +import lombok.Data;
  19 +
  20 +import java.util.UUID;
  21 +
  22 +@Data
  23 +public class TsKey {
  24 + private final UUID entityId;
  25 + private final String key;
  26 +}
... ...
... ... @@ -33,8 +33,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
33 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
34 34 import org.thingsboard.server.dao.DaoUtil;
35 35 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
36   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
37 36 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  37 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
38 38 import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
39 39 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
40 40 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
... ... @@ -48,6 +48,7 @@ import java.util.List;
48 48 import java.util.Optional;
49 49 import java.util.UUID;
50 50 import java.util.concurrent.CompletableFuture;
  51 +import java.util.function.Function;
51 52
52 53 @Component
53 54 @Slf4j
... ... @@ -63,7 +64,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
63 64 @Autowired
64 65 protected InsertTsRepository<TimescaleTsKvEntity> insertRepository;
65 66
66   - protected TbSqlBlockingQueue<TimescaleTsKvEntity> tsQueue;
  67 + protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue;
67 68
68 69 @PostConstruct
69 70 protected void init() {
... ... @@ -74,7 +75,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
74 75 .maxDelay(tsMaxDelay)
75 76 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
76 77 .build();
77   - tsQueue = new TbSqlBlockingQueue<>(tsParams);
  78 +
  79 + Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity != null ? entity.getEntityId().hashCode() : 0;
  80 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads);
  81 +
78 82 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
79 83 }
80 84
... ...