Commit 8808c268aeab07eb2bce9cd668a9e4d684f6581c
Committed by
GitHub
Merge pull request #3076 from YevhenBondarenko/develop/2.5.3-batch-improvements
Develop/2.5.3 batch improvements
Showing
9 changed files
with
140 additions
and
19 deletions
... | ... | @@ -252,14 +252,17 @@ sql: |
252 | 252 | batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" |
253 | 253 | batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" |
254 | 254 | stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}" |
255 | + batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}" | |
255 | 256 | ts: |
256 | 257 | batch_size: "${SQL_TS_BATCH_SIZE:10000}" |
257 | 258 | batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}" |
258 | 259 | stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" |
260 | + batch_threads: "${SQL_TS_BATCH_THREADS:4}" | |
259 | 261 | ts_latest: |
260 | 262 | batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}" |
261 | 263 | batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}" |
262 | 264 | stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}" |
265 | + batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}" | |
263 | 266 | # Specify whether to remove null characters from strValue of attributes and timeseries before insert |
264 | 267 | remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}" |
265 | 268 | postgres: |
... | ... | @@ -268,6 +271,7 @@ sql: |
268 | 271 | timescale: |
269 | 272 | # Specify Interval size for new data chunks storage. |
270 | 273 | chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}" |
274 | + batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}" | |
271 | 275 | ttl: |
272 | 276 | ts: |
273 | 277 | enabled: "${SQL_TTL_TS_ENABLED:true}" | ... | ... |
... | ... | @@ -41,16 +41,14 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> { |
41 | 41 | private final TbSqlBlockingQueueParams params; |
42 | 42 | |
43 | 43 | private ExecutorService executor; |
44 | - private ScheduledLogExecutorComponent logExecutor; | |
45 | 44 | |
46 | 45 | public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) { |
47 | 46 | this.params = params; |
48 | 47 | } |
49 | 48 | |
50 | 49 | @Override |
51 | - public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) { | |
52 | - this.logExecutor = logExecutor; | |
53 | - executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + params.getLogName().toLowerCase())); | |
50 | + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int index) { | |
51 | + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase())); | |
54 | 52 | executor.submit(() -> { |
55 | 53 | String logName = params.getLogName(); |
56 | 54 | int batchSize = params.getBatchSize(); |
... | ... | @@ -94,7 +92,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> { |
94 | 92 | |
95 | 93 | logExecutor.scheduleAtFixedRate(() -> { |
96 | 94 | if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) { |
97 | - log.info("[{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", | |
95 | + log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index, | |
98 | 96 | params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0)); |
99 | 97 | } |
100 | 98 | }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.sql; | |
17 | + | |
18 | +import com.google.common.util.concurrent.ListenableFuture; | |
19 | +import lombok.Data; | |
20 | +import lombok.extern.slf4j.Slf4j; | |
21 | + | |
22 | +import java.util.List; | |
23 | +import java.util.concurrent.CopyOnWriteArrayList; | |
24 | +import java.util.function.Consumer; | |
25 | +import java.util.function.Function; | |
26 | + | |
27 | +@Slf4j | |
28 | +@Data | |
29 | +public class TbSqlBlockingQueueWrapper<E> { | |
30 | + private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>(); | |
31 | + private final TbSqlBlockingQueueParams params; | |
32 | + private ScheduledLogExecutorComponent logExecutor; | |
33 | + private final Function<E, Integer> hashCodeFunction; | |
34 | + private final int maxThreads; | |
35 | + | |
36 | + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) { | |
37 | + for (int i = 0; i < maxThreads; i++) { | |
38 | + TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params); | |
39 | + queues.add(queue); | |
40 | + queue.init(logExecutor, saveFunction, i); | |
41 | + } | |
42 | + } | |
43 | + | |
44 | + public ListenableFuture<Void> add(E element) { | |
45 | + int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0; | |
46 | + return queues.get(queueIndex).add(element); | |
47 | + } | |
48 | + | |
49 | + public void destroy() { | |
50 | + queues.forEach(TbSqlBlockingQueue::destroy); | |
51 | + } | |
52 | +} | ... | ... |
... | ... | @@ -22,7 +22,7 @@ import java.util.function.Consumer; |
22 | 22 | |
23 | 23 | public interface TbSqlQueue<E> { |
24 | 24 | |
25 | - void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction); | |
25 | + void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int queueIndex); | |
26 | 26 | |
27 | 27 | void destroy(); |
28 | 28 | ... | ... |
... | ... | @@ -32,8 +32,8 @@ import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; |
32 | 32 | import org.thingsboard.server.dao.model.sql.AttributeKvEntity; |
33 | 33 | import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; |
34 | 34 | import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; |
35 | -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; | |
36 | 35 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; |
36 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; | |
37 | 37 | import org.thingsboard.server.dao.util.SqlDao; |
38 | 38 | |
39 | 39 | import javax.annotation.PostConstruct; |
... | ... | @@ -41,6 +41,7 @@ import javax.annotation.PreDestroy; |
41 | 41 | import java.util.Collection; |
42 | 42 | import java.util.List; |
43 | 43 | import java.util.Optional; |
44 | +import java.util.function.Function; | |
44 | 45 | import java.util.stream.Collectors; |
45 | 46 | |
46 | 47 | import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; |
... | ... | @@ -68,7 +69,10 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl |
68 | 69 | @Value("${sql.attributes.stats_print_interval_ms:1000}") |
69 | 70 | private long statsPrintIntervalMs; |
70 | 71 | |
71 | - private TbSqlBlockingQueue<AttributeKvEntity> queue; | |
72 | + @Value("${sql.attributes.batch_threads:4}") | |
73 | + private int batchThreads; | |
74 | + | |
75 | + private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue; | |
72 | 76 | |
73 | 77 | @PostConstruct |
74 | 78 | private void init() { |
... | ... | @@ -78,7 +82,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl |
78 | 82 | .maxDelay(maxDelay) |
79 | 83 | .statsPrintIntervalMs(statsPrintIntervalMs) |
80 | 84 | .build(); |
81 | - queue = new TbSqlBlockingQueue<>(params); | |
85 | + | |
86 | + Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode(); | |
87 | + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads); | |
82 | 88 | queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v)); |
83 | 89 | } |
84 | 90 | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.Futures; |
19 | 19 | import com.google.common.util.concurrent.ListenableFuture; |
20 | +import com.google.common.util.concurrent.ListeningExecutorService; | |
20 | 21 | import com.google.common.util.concurrent.MoreExecutors; |
21 | 22 | import com.google.common.util.concurrent.SettableFuture; |
22 | 23 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -31,8 +32,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
31 | 32 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
32 | 33 | import org.thingsboard.server.dao.DaoUtil; |
33 | 34 | import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; |
34 | -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; | |
35 | 35 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; |
36 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; | |
36 | 37 | import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; |
37 | 38 | import org.thingsboard.server.dao.sqlts.ts.TsKvRepository; |
38 | 39 | import org.thingsboard.server.dao.timeseries.TimeseriesDao; |
... | ... | @@ -43,6 +44,7 @@ import java.util.ArrayList; |
43 | 44 | import java.util.List; |
44 | 45 | import java.util.Optional; |
45 | 46 | import java.util.concurrent.CompletableFuture; |
47 | +import java.util.function.Function; | |
46 | 48 | import java.util.stream.Collectors; |
47 | 49 | |
48 | 50 | @Slf4j |
... | ... | @@ -54,7 +56,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq |
54 | 56 | @Autowired |
55 | 57 | protected InsertTsRepository<TsKvEntity> insertRepository; |
56 | 58 | |
57 | - protected TbSqlBlockingQueue<TsKvEntity> tsQueue; | |
59 | + protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue; | |
58 | 60 | |
59 | 61 | @PostConstruct |
60 | 62 | protected void init() { |
... | ... | @@ -65,7 +67,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq |
65 | 67 | .maxDelay(tsMaxDelay) |
66 | 68 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) |
67 | 69 | .build(); |
68 | - tsQueue = new TbSqlBlockingQueue<>(tsParams); | |
70 | + | |
71 | + Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); | |
72 | + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads); | |
69 | 73 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); |
70 | 74 | } |
71 | 75 | ... | ... |
... | ... | @@ -41,8 +41,8 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey; |
41 | 41 | import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; |
42 | 42 | import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; |
43 | 43 | import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; |
44 | -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; | |
45 | 44 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; |
45 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; | |
46 | 46 | import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository; |
47 | 47 | import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; |
48 | 48 | import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; |
... | ... | @@ -52,7 +52,10 @@ import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; |
52 | 52 | import javax.annotation.Nullable; |
53 | 53 | import javax.annotation.PostConstruct; |
54 | 54 | import javax.annotation.PreDestroy; |
55 | +import java.util.ArrayList; | |
56 | +import java.util.HashMap; | |
55 | 57 | import java.util.List; |
58 | +import java.util.Map; | |
56 | 59 | import java.util.Objects; |
57 | 60 | import java.util.Optional; |
58 | 61 | import java.util.concurrent.ConcurrentHashMap; |
... | ... | @@ -82,7 +85,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
82 | 85 | @Autowired |
83 | 86 | private TsKvDictionaryRepository dictionaryRepository; |
84 | 87 | |
85 | - private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue; | |
88 | + private TbSqlBlockingQueueWrapper<TsKvLatestEntity> tsLatestQueue; | |
86 | 89 | |
87 | 90 | @Value("${sql.ts_latest.batch_size:1000}") |
88 | 91 | private int tsLatestBatchSize; |
... | ... | @@ -93,6 +96,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
93 | 96 | @Value("${sql.ts_latest.stats_print_interval_ms:1000}") |
94 | 97 | private long tsLatestStatsPrintIntervalMs; |
95 | 98 | |
99 | + @Value("${sql.ts_latest.batch_threads:4}") | |
100 | + private int tsLatestBatchThreads; | |
101 | + | |
96 | 102 | @Autowired |
97 | 103 | protected ScheduledLogExecutorComponent logExecutor; |
98 | 104 | |
... | ... | @@ -105,6 +111,12 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
105 | 111 | @Value("${sql.ts.stats_print_interval_ms:1000}") |
106 | 112 | protected long tsStatsPrintIntervalMs; |
107 | 113 | |
114 | + @Value("${sql.ts.batch_threads:4}") | |
115 | + protected int tsBatchThreads; | |
116 | + | |
117 | + @Value("${sql.timescale.batch_threads:4}") | |
118 | + protected int timescaleBatchThreads; | |
119 | + | |
108 | 120 | @PostConstruct |
109 | 121 | protected void init() { |
110 | 122 | TbSqlBlockingQueueParams tsLatestParams = TbSqlBlockingQueueParams.builder() |
... | ... | @@ -113,8 +125,22 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx |
113 | 125 | .maxDelay(tsLatestMaxDelay) |
114 | 126 | .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs) |
115 | 127 | .build(); |
116 | - tsLatestQueue = new TbSqlBlockingQueue<>(tsLatestParams); | |
117 | - tsLatestQueue.init(logExecutor, v -> insertLatestTsRepository.saveOrUpdate(v)); | |
128 | + | |
129 | + java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); | |
130 | + tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads); | |
131 | + | |
132 | + tsLatestQueue.init(logExecutor, v -> { | |
133 | + Map<TsKey, TsKvLatestEntity> trueLatest = new HashMap<>(); | |
134 | + v.forEach(ts -> { | |
135 | + TsKey key = new TsKey(ts.getEntityId(), ts.getKey()); | |
136 | + TsKvLatestEntity old = trueLatest.get(key); | |
137 | + if (old == null || old.getTs() < ts.getTs()) { | |
138 | + trueLatest.put(key, ts); | |
139 | + } | |
140 | + }); | |
141 | + List<TsKvLatestEntity> latestEntities = new ArrayList<>(trueLatest.values()); | |
142 | + insertLatestTsRepository.saveOrUpdate(latestEntities); | |
143 | + }); | |
118 | 144 | } |
119 | 145 | |
120 | 146 | @PreDestroy | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.sqlts; | |
17 | + | |
18 | +import lombok.Data; | |
19 | + | |
20 | +import java.util.UUID; | |
21 | + | |
22 | +@Data | |
23 | +public class TsKey { | |
24 | + private final UUID entityId; | |
25 | + private final int key; | |
26 | +} | ... | ... |
... | ... | @@ -33,8 +33,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; |
33 | 33 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
34 | 34 | import org.thingsboard.server.dao.DaoUtil; |
35 | 35 | import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; |
36 | -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; | |
37 | 36 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; |
37 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; | |
38 | 38 | import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; |
39 | 39 | import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository; |
40 | 40 | import org.thingsboard.server.dao.timeseries.TimeseriesDao; |
... | ... | @@ -48,6 +48,7 @@ import java.util.List; |
48 | 48 | import java.util.Optional; |
49 | 49 | import java.util.UUID; |
50 | 50 | import java.util.concurrent.CompletableFuture; |
51 | +import java.util.function.Function; | |
51 | 52 | |
52 | 53 | @Component |
53 | 54 | @Slf4j |
... | ... | @@ -63,7 +64,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements |
63 | 64 | @Autowired |
64 | 65 | protected InsertTsRepository<TimescaleTsKvEntity> insertRepository; |
65 | 66 | |
66 | - protected TbSqlBlockingQueue<TimescaleTsKvEntity> tsQueue; | |
67 | + protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue; | |
67 | 68 | |
68 | 69 | @PostConstruct |
69 | 70 | protected void init() { |
... | ... | @@ -74,7 +75,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements |
74 | 75 | .maxDelay(tsMaxDelay) |
75 | 76 | .statsPrintIntervalMs(tsStatsPrintIntervalMs) |
76 | 77 | .build(); |
77 | - tsQueue = new TbSqlBlockingQueue<>(tsParams); | |
78 | + | |
79 | + Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode(); | |
80 | + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads); | |
81 | + | |
78 | 82 | tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v)); |
79 | 83 | } |
80 | 84 | |
... | ... | @@ -277,4 +281,5 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements |
277 | 281 | startTs, |
278 | 282 | endTs); |
279 | 283 | } |
284 | + | |
280 | 285 | } | ... | ... |