Commit 2f6cc3763ac6c340231d038b280e1fc2e580c987
1 parent
ca7ffd99
Configurable timeseries insert service for SQL database type.
Showing
4 changed files
with
88 additions
and
3 deletions
@@ -157,6 +157,13 @@ cassandra: | @@ -157,6 +157,13 @@ cassandra: | ||
157 | # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS | 157 | # Specify partitioning size for timestamp key-value storage. Example MINUTES, HOURS, DAYS, MONTHS |
158 | ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" | 158 | ts_key_value_partitioning: "${TS_KV_PARTITIONING:MONTHS}" |
159 | 159 | ||
160 | +# SQL configuration parameters | ||
161 | +sql: | ||
162 | + # Specify executor service type used to perform timeseries insert tasks: SINGLE FIXED CACHED | ||
163 | + ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}" | ||
164 | + # Specify thread pool size for FIXED executor service type | ||
165 | + ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:10}" | ||
166 | + | ||
160 | # Actor system parameters | 167 | # Actor system parameters |
161 | actors: | 168 | actors: |
162 | tenant: | 169 | tenant: |
@@ -20,6 +20,7 @@ import com.google.common.collect.Lists; | @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; | ||
20 | import com.google.common.util.concurrent.*; | 20 | import com.google.common.util.concurrent.*; |
21 | import lombok.extern.slf4j.Slf4j; | 21 | import lombok.extern.slf4j.Slf4j; |
22 | import org.springframework.beans.factory.annotation.Autowired; | 22 | import org.springframework.beans.factory.annotation.Autowired; |
23 | +import org.springframework.beans.factory.annotation.Value; | ||
23 | import org.springframework.data.domain.PageRequest; | 24 | import org.springframework.data.domain.PageRequest; |
24 | import org.springframework.stereotype.Component; | 25 | import org.springframework.stereotype.Component; |
25 | import org.thingsboard.server.common.data.UUIDConverter; | 26 | import org.thingsboard.server.common.data.UUIDConverter; |
@@ -31,14 +32,17 @@ import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey; | @@ -31,14 +32,17 @@ import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey; | ||
31 | import org.thingsboard.server.dao.model.sql.TsKvLatestEntity; | 32 | import org.thingsboard.server.dao.model.sql.TsKvLatestEntity; |
32 | import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; | 33 | import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; |
33 | import org.thingsboard.server.dao.timeseries.TimeseriesDao; | 34 | import org.thingsboard.server.dao.timeseries.TimeseriesDao; |
35 | +import org.thingsboard.server.dao.timeseries.TsInsertExecutorType; | ||
34 | import org.thingsboard.server.dao.util.SqlDao; | 36 | import org.thingsboard.server.dao.util.SqlDao; |
35 | 37 | ||
36 | import javax.annotation.Nullable; | 38 | import javax.annotation.Nullable; |
39 | +import javax.annotation.PostConstruct; | ||
37 | import javax.annotation.PreDestroy; | 40 | import javax.annotation.PreDestroy; |
38 | import java.util.ArrayList; | 41 | import java.util.ArrayList; |
39 | import java.util.List; | 42 | import java.util.List; |
40 | import java.util.Optional; | 43 | import java.util.Optional; |
41 | import java.util.concurrent.CompletableFuture; | 44 | import java.util.concurrent.CompletableFuture; |
45 | +import java.util.concurrent.ExecutorService; | ||
42 | import java.util.concurrent.Executors; | 46 | import java.util.concurrent.Executors; |
43 | import java.util.stream.Collectors; | 47 | import java.util.stream.Collectors; |
44 | 48 | ||
@@ -50,7 +54,13 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; | @@ -50,7 +54,13 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; | ||
50 | @SqlDao | 54 | @SqlDao |
51 | public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao { | 55 | public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao { |
52 | 56 | ||
53 | - private ListeningExecutorService insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); | 57 | + @Value("${sql.ts_inserts_executor_type}") |
58 | + private String insertExecutorType; | ||
59 | + | ||
60 | + @Value("${sql.ts_inserts_fixed_thread_pool_size}") | ||
61 | + private int insertFixedThreadPoolSize; | ||
62 | + | ||
63 | + private ListeningExecutorService insertService; | ||
54 | 64 | ||
55 | @Autowired | 65 | @Autowired |
56 | private TsKvRepository tsKvRepository; | 66 | private TsKvRepository tsKvRepository; |
@@ -58,6 +68,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp | @@ -58,6 +68,32 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp | ||
58 | @Autowired | 68 | @Autowired |
59 | private TsKvLatestRepository tsKvLatestRepository; | 69 | private TsKvLatestRepository tsKvLatestRepository; |
60 | 70 | ||
71 | + @PostConstruct | ||
72 | + public void init() { | ||
73 | + Optional<TsInsertExecutorType> executorTypeOptional = TsInsertExecutorType.parse(insertExecutorType); | ||
74 | + TsInsertExecutorType executorType; | ||
75 | + if (executorTypeOptional.isPresent()) { | ||
76 | + executorType = executorTypeOptional.get(); | ||
77 | + } else { | ||
78 | + executorType = TsInsertExecutorType.FIXED; | ||
79 | + } | ||
80 | + switch (executorType) { | ||
81 | + case SINGLE: | ||
82 | + insertService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); | ||
83 | + break; | ||
84 | + case FIXED: | ||
85 | + int poolSize = insertFixedThreadPoolSize; | ||
86 | + if (poolSize <= 0) { | ||
87 | + poolSize = 10; | ||
88 | + } | ||
89 | + insertService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(poolSize)); | ||
90 | + break; | ||
91 | + case CACHED: | ||
92 | + insertService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); | ||
93 | + break; | ||
94 | + } | ||
95 | + } | ||
96 | + | ||
61 | @Override | 97 | @Override |
62 | public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) { | 98 | public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) { |
63 | List<ListenableFuture<List<TsKvEntry>>> futures = queries | 99 | List<ListenableFuture<List<TsKvEntry>>> futures = queries |
@@ -265,7 +301,9 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp | @@ -265,7 +301,9 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp | ||
265 | 301 | ||
266 | @PreDestroy | 302 | @PreDestroy |
267 | void onDestroy() { | 303 | void onDestroy() { |
268 | - insertService.shutdown(); | 304 | + if (insertService != null) { |
305 | + insertService.shutdown(); | ||
306 | + } | ||
269 | } | 307 | } |
270 | 308 | ||
271 | } | 309 | } |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.dao.timeseries; | ||
17 | + | ||
18 | +import java.util.Optional; | ||
19 | + | ||
20 | +public enum TsInsertExecutorType { | ||
21 | + SINGLE, | ||
22 | + FIXED, | ||
23 | + CACHED; | ||
24 | + | ||
25 | + public static Optional<TsInsertExecutorType> parse(String name) { | ||
26 | + TsInsertExecutorType executorType = null; | ||
27 | + if (name != null) { | ||
28 | + for (TsInsertExecutorType type : TsInsertExecutorType.values()) { | ||
29 | + if (type.name().equalsIgnoreCase(name)) { | ||
30 | + executorType = type; | ||
31 | + break; | ||
32 | + } | ||
33 | + } | ||
34 | + } | ||
35 | + return Optional.of(executorType); | ||
36 | + } | ||
37 | +} |