Commit c2e36851fc117db1ad8828ee7a91d534b6b14e2f

Authored by Andrii Shvaika
2 parents 749664bc 8808c268

Merge with 2.5.3

Showing 47 changed files with 787 additions and 160 deletions

Too many changes to show.

To preserve performance only 47 of 76 files are displayed.

... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>thingsboard</artifactId>
25 25 </parent>
26 26 <artifactId>application</artifactId>
... ... @@ -309,6 +309,18 @@
309 309 <artifactId>Java-WebSocket</artifactId>
310 310 <scope>test</scope>
311 311 </dependency>
  312 + <dependency>
  313 + <groupId>org.springframework.boot</groupId>
  314 + <artifactId>spring-boot-starter-actuator</artifactId>
  315 + </dependency>
  316 + <dependency>
  317 + <groupId>io.micrometer</groupId>
  318 + <artifactId>micrometer-core</artifactId>
  319 + </dependency>
  320 + <dependency>
  321 + <groupId>io.micrometer</groupId>
  322 + <artifactId>micrometer-registry-prometheus</artifactId>
  323 + </dependency>
312 324 </dependencies>
313 325
314 326 <build>
... ...
... ... @@ -220,6 +220,10 @@ public class ActorSystemContext {
220 220 @Getter
221 221 private ClaimDevicesService claimDevicesService;
222 222
  223 + @Autowired
  224 + @Getter
  225 + private JsInvokeStats jsInvokeStats;
  226 +
223 227 //TODO: separate context for TbCore and TbRuleEngine
224 228 @Autowired(required = false)
225 229 @Getter
... ... @@ -273,19 +277,14 @@ public class ActorSystemContext {
273 277 @Getter
274 278 private long statisticsPersistFrequency;
275 279
276   - @Getter
277   - private final AtomicInteger jsInvokeRequestsCount = new AtomicInteger(0);
278   - @Getter
279   - private final AtomicInteger jsInvokeResponsesCount = new AtomicInteger(0);
280   - @Getter
281   - private final AtomicInteger jsInvokeFailuresCount = new AtomicInteger(0);
282 280
283 281 @Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
284 282 public void printStats() {
285 283 if (statisticsEnabled) {
286   - if (jsInvokeRequestsCount.get() > 0 || jsInvokeResponsesCount.get() > 0 || jsInvokeFailuresCount.get() > 0) {
  284 + if (jsInvokeStats.getRequests() > 0 || jsInvokeStats.getResponses() > 0 || jsInvokeStats.getFailures() > 0) {
287 285 log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]",
288   - jsInvokeRequestsCount.getAndSet(0), jsInvokeResponsesCount.getAndSet(0), jsInvokeFailuresCount.getAndSet(0));
  286 + jsInvokeStats.getRequests(), jsInvokeStats.getResponses(), jsInvokeStats.getFailures());
  287 + jsInvokeStats.reset();
289 288 }
290 289 }
291 290 }
... ...
... ... @@ -293,21 +293,21 @@ class DefaultTbContext implements TbContext {
293 293 @Override
294 294 public void logJsEvalRequest() {
295 295 if (mainCtx.isStatisticsEnabled()) {
296   - mainCtx.getJsInvokeRequestsCount().incrementAndGet();
  296 + mainCtx.getJsInvokeStats().incrementRequests();
297 297 }
298 298 }
299 299
300 300 @Override
301 301 public void logJsEvalResponse() {
302 302 if (mainCtx.isStatisticsEnabled()) {
303   - mainCtx.getJsInvokeResponsesCount().incrementAndGet();
  303 + mainCtx.getJsInvokeStats().incrementResponses();
304 304 }
305 305 }
306 306
307 307 @Override
308 308 public void logJsEvalFailure() {
309 309 if (mainCtx.isStatisticsEnabled()) {
310   - mainCtx.getJsInvokeFailuresCount().incrementAndGet();
  310 + mainCtx.getJsInvokeStats().incrementFailures();
311 311 }
312 312 }
313 313
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.metrics;
  17 +
  18 +import io.micrometer.core.instrument.Counter;
  19 +
  20 +public class StubCounter implements Counter {
  21 + @Override
  22 + public void increment(double amount) {}
  23 +
  24 + @Override
  25 + public double count() {
  26 + return 0;
  27 + }
  28 +
  29 + @Override
  30 + public Id getId() {
  31 + return null;
  32 + }
  33 +}
... ...
... ... @@ -53,6 +53,7 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
53 53 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
54 54 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
55 55 import org.thingsboard.server.service.state.DeviceStateService;
  56 +import org.thingsboard.server.service.stats.StatsCounterFactory;
56 57 import org.thingsboard.server.service.subscription.SubscriptionManagerService;
57 58 import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
58 59 import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
... ... @@ -88,18 +89,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
88 89 private final TbLocalSubscriptionService localSubscriptionService;
89 90 private final SubscriptionManagerService subscriptionManagerService;
90 91 private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
91   - private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
  92 + private final TbCoreConsumerStats stats;
92 93
93 94 public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
94 95 DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
95 96 SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
96   - TbCoreDeviceRpcService tbCoreDeviceRpcService) {
  97 + TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsCounterFactory counterFactory) {
97 98 super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
98 99 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
99 100 this.stateService = stateService;
100 101 this.localSubscriptionService = localSubscriptionService;
101 102 this.subscriptionManagerService = subscriptionManagerService;
102 103 this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
  104 + this.stats = new TbCoreConsumerStats(counterFactory);
103 105 }
104 106
105 107 @PostConstruct
... ... @@ -235,6 +237,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
235 237 public void printStats() {
236 238 if (statsEnabled) {
237 239 stats.printStats();
  240 + stats.reset();
238 241 }
239 242 }
240 243
... ...
... ... @@ -52,6 +52,7 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrateg
52 52 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
53 53 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
54 54 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
  55 +import org.thingsboard.server.service.stats.StatsCounterFactory;
55 56
56 57 import javax.annotation.PostConstruct;
57 58 import javax.annotation.PreDestroy;
... ... @@ -79,6 +80,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
79 80 @Value("${queue.rule-engine.stats.enabled:true}")
80 81 private boolean statsEnabled;
81 82
  83 + private final StatsCounterFactory counterFactory;
82 84 private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
83 85 private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory;
84 86 private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
... ... @@ -95,7 +97,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
95 97 TbQueueRuleEngineSettings ruleEngineSettings,
96 98 TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
97 99 ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
98   - TbRuleEngineDeviceRpcService tbDeviceRpcService) {
  100 + TbRuleEngineDeviceRpcService tbDeviceRpcService,
  101 + StatsCounterFactory counterFactory) {
99 102 super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
100 103 this.statisticsService = statisticsService;
101 104 this.ruleEngineSettings = ruleEngineSettings;
... ... @@ -103,6 +106,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
103 106 this.submitStrategyFactory = submitStrategyFactory;
104 107 this.processingStrategyFactory = processingStrategyFactory;
105 108 this.tbDeviceRpcService = tbDeviceRpcService;
  109 + this.counterFactory = counterFactory;
106 110 }
107 111
108 112 @PostConstruct
... ... @@ -111,7 +115,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
111 115 for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) {
112 116 consumerConfigurations.putIfAbsent(configuration.getName(), configuration);
113 117 consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
114   - consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName()));
  118 + consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), counterFactory));
115 119 }
116 120 submitExecutor = Executors.newSingleThreadExecutor();
117 121 }
... ... @@ -269,6 +273,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
269 273 consumerStats.forEach((queue, stats) -> {
270 274 stats.printStats();
271 275 statisticsService.reportQueueStats(ts, stats);
  276 + stats.reset();
272 277 });
273 278 }
274 279 }
... ...
... ... @@ -17,76 +17,124 @@ package org.thingsboard.server.service.queue;
17 17
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.gen.transport.TransportProtos;
  20 +import org.thingsboard.server.service.stats.StatsCounter;
  21 +import org.thingsboard.server.service.stats.StatsCounterFactory;
  22 +import org.thingsboard.server.service.stats.StatsType;
20 23
  24 +import java.util.*;
21 25 import java.util.concurrent.atomic.AtomicInteger;
22 26
23 27 @Slf4j
24 28 public class TbCoreConsumerStats {
  29 + public static final String TOTAL_MSGS = "totalMsgs";
  30 + public static final String SESSION_EVENTS = "sessionEvents";
  31 + public static final String GET_ATTRIBUTE = "getAttr";
  32 + public static final String ATTRIBUTE_SUBSCRIBES = "subToAttr";
  33 + public static final String RPC_SUBSCRIBES = "subToRpc";
  34 + public static final String TO_DEVICE_RPC_CALL_RESPONSES = "toDevRpc";
  35 + public static final String SUBSCRIPTION_INFO = "subInfo";
  36 + public static final String DEVICE_CLAIMS = "claimDevice";
  37 + public static final String DEVICE_STATES = "deviceState";
  38 + public static final String SUBSCRIPTION_MSGS = "subMsgs";
  39 + public static final String TO_CORE_NOTIFICATIONS = "coreNfs";
25 40
26   - private final AtomicInteger totalCounter = new AtomicInteger(0);
27   - private final AtomicInteger sessionEventCounter = new AtomicInteger(0);
28   - private final AtomicInteger getAttributesCounter = new AtomicInteger(0);
29   - private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0);
30   - private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0);
31   - private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0);
32   - private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0);
33   - private final AtomicInteger claimDeviceCounter = new AtomicInteger(0);
  41 + private final StatsCounter totalCounter;
  42 + private final StatsCounter sessionEventCounter;
  43 + private final StatsCounter getAttributesCounter;
  44 + private final StatsCounter subscribeToAttributesCounter;
  45 + private final StatsCounter subscribeToRPCCounter;
  46 + private final StatsCounter toDeviceRPCCallResponseCounter;
  47 + private final StatsCounter subscriptionInfoCounter;
  48 + private final StatsCounter claimDeviceCounter;
34 49
35   - private final AtomicInteger deviceStateCounter = new AtomicInteger(0);
36   - private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0);
37   - private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0);
  50 + private final StatsCounter deviceStateCounter;
  51 + private final StatsCounter subscriptionMsgCounter;
  52 + private final StatsCounter toCoreNotificationsCounter;
  53 +
  54 + private final List<StatsCounter> counters = new ArrayList<>();
  55 +
  56 + public TbCoreConsumerStats(StatsCounterFactory counterFactory) {
  57 + String statsKey = StatsType.CORE.getName();
  58 +
  59 + this.totalCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
  60 + this.sessionEventCounter = counterFactory.createStatsCounter(statsKey, SESSION_EVENTS);
  61 + this.getAttributesCounter = counterFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
  62 + this.subscribeToAttributesCounter = counterFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
  63 + this.subscribeToRPCCounter = counterFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
  64 + this.toDeviceRPCCallResponseCounter = counterFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
  65 + this.subscriptionInfoCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
  66 + this.claimDeviceCounter = counterFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
  67 + this.deviceStateCounter = counterFactory.createStatsCounter(statsKey, DEVICE_STATES);
  68 + this.subscriptionMsgCounter = counterFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
  69 + this.toCoreNotificationsCounter = counterFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
  70 +
  71 +
  72 + counters.add(totalCounter);
  73 + counters.add(sessionEventCounter);
  74 + counters.add(getAttributesCounter);
  75 + counters.add(subscribeToAttributesCounter);
  76 + counters.add(subscribeToRPCCounter);
  77 + counters.add(toDeviceRPCCallResponseCounter);
  78 + counters.add(subscriptionInfoCounter);
  79 + counters.add(claimDeviceCounter);
  80 +
  81 + counters.add(deviceStateCounter);
  82 + counters.add(subscriptionMsgCounter);
  83 + counters.add(toCoreNotificationsCounter);
  84 + }
38 85
39 86 public void log(TransportProtos.TransportToDeviceActorMsg msg) {
40   - totalCounter.incrementAndGet();
  87 + totalCounter.increment();
41 88 if (msg.hasSessionEvent()) {
42   - sessionEventCounter.incrementAndGet();
  89 + sessionEventCounter.increment();
43 90 }
44 91 if (msg.hasGetAttributes()) {
45   - getAttributesCounter.incrementAndGet();
  92 + getAttributesCounter.increment();
46 93 }
47 94 if (msg.hasSubscribeToAttributes()) {
48   - subscribeToAttributesCounter.incrementAndGet();
  95 + subscribeToAttributesCounter.increment();
49 96 }
50 97 if (msg.hasSubscribeToRPC()) {
51   - subscribeToRPCCounter.incrementAndGet();
  98 + subscribeToRPCCounter.increment();
52 99 }
53 100 if (msg.hasToDeviceRPCCallResponse()) {
54   - toDeviceRPCCallResponseCounter.incrementAndGet();
  101 + toDeviceRPCCallResponseCounter.increment();
55 102 }
56 103 if (msg.hasSubscriptionInfo()) {
57   - subscriptionInfoCounter.incrementAndGet();
  104 + subscriptionInfoCounter.increment();
58 105 }
59 106 if (msg.hasClaimDevice()) {
60   - claimDeviceCounter.incrementAndGet();
  107 + claimDeviceCounter.increment();
61 108 }
62 109 }
63 110
64 111 public void log(TransportProtos.DeviceStateServiceMsgProto msg) {
65   - totalCounter.incrementAndGet();
66   - deviceStateCounter.incrementAndGet();
  112 + totalCounter.increment();
  113 + deviceStateCounter.increment();
67 114 }
68 115
69 116 public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
70   - totalCounter.incrementAndGet();
71   - subscriptionMsgCounter.incrementAndGet();
  117 + totalCounter.increment();
  118 + subscriptionMsgCounter.increment();
72 119 }
73 120
74 121 public void log(TransportProtos.ToCoreNotificationMsg msg) {
75   - totalCounter.incrementAndGet();
76   - toCoreNotificationsCounter.incrementAndGet();
  122 + totalCounter.increment();
  123 + toCoreNotificationsCounter.increment();
77 124 }
78 125
79 126 public void printStats() {
80   - int total = totalCounter.getAndSet(0);
  127 + int total = totalCounter.get();
81 128 if (total > 0) {
82   - log.info("Total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" +
83   - " deviceState [{}] subMgr [{}] coreNfs [{}]",
84   - total, sessionEventCounter.getAndSet(0),
85   - getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0),
86   - subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0),
87   - subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)
88   - , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0));
  129 + StringBuilder stats = new StringBuilder();
  130 + counters.forEach(counter -> {
  131 + stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
  132 + });
  133 + log.info("Core Stats: {}", stats);
89 134 }
90 135 }
91 136
  137 + public void reset() {
  138 + counters.forEach(StatsCounter::clear);
  139 + }
92 140 }
... ...
... ... @@ -22,16 +22,16 @@ import org.thingsboard.server.common.msg.queue.RuleEngineException;
22 22 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
23 23 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
24 24 import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
  25 +import org.thingsboard.server.service.stats.StatsCounter;
  26 +import org.thingsboard.server.service.stats.StatsCounterFactory;
  27 +import org.thingsboard.server.service.stats.StatsType;
25 28
26   -import java.util.HashMap;
27   -import java.util.Map;
28   -import java.util.UUID;
  29 +import java.util.*;
29 30 import java.util.concurrent.ConcurrentHashMap;
30 31 import java.util.concurrent.ConcurrentMap;
31 32 import java.util.concurrent.atomic.AtomicInteger;
32 33
33 34 @Slf4j
34   -@Data
35 35 public class TbRuleEngineConsumerStats {
36 36
37 37 public static final String TOTAL_MSGS = "totalMsgs";
... ... @@ -43,61 +43,72 @@ public class TbRuleEngineConsumerStats {
43 43 public static final String SUCCESSFUL_ITERATIONS = "successfulIterations";
44 44 public static final String FAILED_ITERATIONS = "failedIterations";
45 45
46   - private final AtomicInteger totalMsgCounter = new AtomicInteger(0);
47   - private final AtomicInteger successMsgCounter = new AtomicInteger(0);
48   - private final AtomicInteger tmpTimeoutMsgCounter = new AtomicInteger(0);
49   - private final AtomicInteger tmpFailedMsgCounter = new AtomicInteger(0);
  46 + private final StatsCounter totalMsgCounter;
  47 + private final StatsCounter successMsgCounter;
  48 + private final StatsCounter tmpTimeoutMsgCounter;
  49 + private final StatsCounter tmpFailedMsgCounter;
50 50
51   - private final AtomicInteger timeoutMsgCounter = new AtomicInteger(0);
52   - private final AtomicInteger failedMsgCounter = new AtomicInteger(0);
  51 + private final StatsCounter timeoutMsgCounter;
  52 + private final StatsCounter failedMsgCounter;
53 53
54   - private final AtomicInteger successIterationsCounter = new AtomicInteger(0);
55   - private final AtomicInteger failedIterationsCounter = new AtomicInteger(0);
  54 + private final StatsCounter successIterationsCounter;
  55 + private final StatsCounter failedIterationsCounter;
56 56
57   - private final Map<String, AtomicInteger> counters = new HashMap<>();
  57 + private final List<StatsCounter> counters = new ArrayList<>();
58 58 private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>();
59 59 private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>();
60 60
61 61 private final String queueName;
62 62
63   - public TbRuleEngineConsumerStats(String queueName) {
  63 + public TbRuleEngineConsumerStats(String queueName, StatsCounterFactory counterFactory) {
64 64 this.queueName = queueName;
65   - counters.put(TOTAL_MSGS, totalMsgCounter);
66   - counters.put(SUCCESSFUL_MSGS, successMsgCounter);
67   - counters.put(TIMEOUT_MSGS, timeoutMsgCounter);
68   - counters.put(FAILED_MSGS, failedMsgCounter);
69   -
70   - counters.put(TMP_TIMEOUT, tmpTimeoutMsgCounter);
71   - counters.put(TMP_FAILED, tmpFailedMsgCounter);
72   - counters.put(SUCCESSFUL_ITERATIONS, successIterationsCounter);
73   - counters.put(FAILED_ITERATIONS, failedIterationsCounter);
  65 +
  66 + String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
  67 + this.totalMsgCounter = counterFactory.createStatsCounter(statsKey, TOTAL_MSGS);
  68 + this.successMsgCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_MSGS);
  69 + this.timeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TIMEOUT_MSGS);
  70 + this.failedMsgCounter = counterFactory.createStatsCounter(statsKey, FAILED_MSGS);
  71 + this.tmpTimeoutMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_TIMEOUT);
  72 + this.tmpFailedMsgCounter = counterFactory.createStatsCounter(statsKey, TMP_FAILED);
  73 + this.successIterationsCounter = counterFactory.createStatsCounter(statsKey, SUCCESSFUL_ITERATIONS);
  74 + this.failedIterationsCounter = counterFactory.createStatsCounter(statsKey, FAILED_ITERATIONS);
  75 +
  76 + counters.add(totalMsgCounter);
  77 + counters.add(successMsgCounter);
  78 + counters.add(timeoutMsgCounter);
  79 + counters.add(failedMsgCounter);
  80 +
  81 + counters.add(tmpTimeoutMsgCounter);
  82 + counters.add(tmpFailedMsgCounter);
  83 + counters.add(successIterationsCounter);
  84 + counters.add(failedIterationsCounter);
74 85 }
75 86
76 87 public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) {
77 88 int success = msg.getSuccessMap().size();
78 89 int pending = msg.getPendingMap().size();
79 90 int failed = msg.getFailedMap().size();
80   - totalMsgCounter.addAndGet(success + pending + failed);
81   - successMsgCounter.addAndGet(success);
  91 + totalMsgCounter.add(success + pending + failed);
  92 + successMsgCounter.add(success);
82 93 msg.getSuccessMap().values().forEach(m -> getTenantStats(m).logSuccess());
83 94 if (finalIterationForPack) {
84 95 if (pending > 0 || failed > 0) {
85   - timeoutMsgCounter.addAndGet(pending);
86   - failedMsgCounter.addAndGet(failed);
  96 + timeoutMsgCounter.add(pending);
  97 + failedMsgCounter.add(failed);
87 98 if (pending > 0) {
88 99 msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTimeout());
89 100 }
90 101 if (failed > 0) {
91 102 msg.getFailedMap().values().forEach(m -> getTenantStats(m).logFailed());
92 103 }
93   - failedIterationsCounter.incrementAndGet();
  104 + failedIterationsCounter.increment();
94 105 } else {
95   - successIterationsCounter.incrementAndGet();
  106 + successIterationsCounter.increment();
96 107 }
97 108 } else {
98   - failedIterationsCounter.incrementAndGet();
99   - tmpTimeoutMsgCounter.addAndGet(pending);
100   - tmpFailedMsgCounter.addAndGet(failed);
  109 + failedIterationsCounter.increment();
  110 + tmpTimeoutMsgCounter.add(pending);
  111 + tmpFailedMsgCounter.add(failed);
101 112 if (pending > 0) {
102 113 msg.getPendingMap().values().forEach(m -> getTenantStats(m).logTmpTimeout());
103 114 }
... ... @@ -113,19 +124,31 @@ public class TbRuleEngineConsumerStats {
113 124 return tenantStats.computeIfAbsent(new UUID(reMsg.getTenantIdMSB(), reMsg.getTenantIdLSB()), TbTenantRuleEngineStats::new);
114 125 }
115 126
  127 + public ConcurrentMap<UUID, TbTenantRuleEngineStats> getTenantStats() {
  128 + return tenantStats;
  129 + }
  130 +
  131 + public String getQueueName() {
  132 + return queueName;
  133 + }
  134 +
  135 + public ConcurrentMap<TenantId, RuleEngineException> getTenantExceptions() {
  136 + return tenantExceptions;
  137 + }
  138 +
116 139 public void printStats() {
117 140 int total = totalMsgCounter.get();
118 141 if (total > 0) {
119 142 StringBuilder stats = new StringBuilder();
120   - counters.forEach((label, value) -> {
121   - stats.append(label).append(" = [").append(value.get()).append("] ");
  143 + counters.forEach(counter -> {
  144 + stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
122 145 });
123 146 log.info("[{}] Stats: {}", queueName, stats);
124 147 }
125 148 }
126 149
127 150 public void reset() {
128   - counters.values().forEach(counter -> counter.set(0));
  151 + counters.forEach(StatsCounter::clear);
129 152 tenantStats.clear();
130 153 tenantExceptions.clear();
131 154 }
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.stats;
  17 +
  18 +import org.springframework.beans.factory.annotation.Autowired;
  19 +import org.springframework.stereotype.Service;
  20 +import org.thingsboard.server.actors.JsInvokeStats;
  21 +
  22 +import javax.annotation.PostConstruct;
  23 +
  24 +@Service
  25 +public class DefaultJsInvokeStats implements JsInvokeStats {
  26 + private static final String REQUESTS = "requests";
  27 + private static final String RESPONSES = "responses";
  28 + private static final String FAILURES = "failures";
  29 +
  30 + private StatsCounter requestsCounter;
  31 + private StatsCounter responsesCounter;
  32 + private StatsCounter failuresCounter;
  33 +
  34 + @Autowired
  35 + private StatsCounterFactory counterFactory;
  36 +
  37 + @PostConstruct
  38 + public void init() {
  39 + String key = StatsType.JS_INVOKE.getName();
  40 + this.requestsCounter = counterFactory.createStatsCounter(key, REQUESTS);
  41 + this.responsesCounter = counterFactory.createStatsCounter(key, RESPONSES);
  42 + this.failuresCounter = counterFactory.createStatsCounter(key, FAILURES);
  43 + }
  44 +
  45 + @Override
  46 + public void incrementRequests(int amount) {
  47 + requestsCounter.add(amount);
  48 + }
  49 +
  50 + @Override
  51 + public void incrementResponses(int amount) {
  52 + responsesCounter.add(amount);
  53 + }
  54 +
  55 + @Override
  56 + public void incrementFailures(int amount) {
  57 + failuresCounter.add(amount);
  58 + }
  59 +
  60 + @Override
  61 + public int getRequests() {
  62 + return requestsCounter.get();
  63 + }
  64 +
  65 + @Override
  66 + public int getResponses() {
  67 + return responsesCounter.get();
  68 + }
  69 +
  70 + @Override
  71 + public int getFailures() {
  72 + return failuresCounter.get();
  73 + }
  74 +
  75 + @Override
  76 + public void reset() {
  77 + requestsCounter.clear();
  78 + responsesCounter.clear();
  79 + failuresCounter.clear();
  80 + }
  81 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.stats;
  17 +
  18 +import org.thingsboard.server.queue.stats.QueueStats;
  19 +
  20 +public class DefaultQueueStats implements QueueStats {
  21 + private final StatsCounter totalCounter;
  22 + private final StatsCounter successfulCounter;
  23 + private final StatsCounter failedCounter;
  24 +
  25 + public DefaultQueueStats(StatsCounter totalCounter, StatsCounter successfulCounter, StatsCounter failedCounter) {
  26 + this.totalCounter = totalCounter;
  27 + this.successfulCounter = successfulCounter;
  28 + this.failedCounter = failedCounter;
  29 + }
  30 +
  31 + @Override
  32 + public void incrementTotal(int amount) {
  33 + totalCounter.add(amount);
  34 + }
  35 +
  36 + @Override
  37 + public void incrementSuccessful(int amount) {
  38 + successfulCounter.add(amount);
  39 + }
  40 +
  41 + @Override
  42 + public void incrementFailed(int amount) {
  43 + failedCounter.add(amount);
  44 + }
  45 +}
... ...
... ... @@ -104,7 +104,6 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
104 104 }
105 105 }
106 106 });
107   - ruleEngineStats.reset();
108 107 }
109 108
110 109 private AssetId getServiceAssetId(TenantId tenantId, String queueName) {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.stats;
  17 +
  18 +import io.micrometer.core.instrument.Counter;
  19 +
  20 +import java.util.concurrent.atomic.AtomicInteger;
  21 +
  22 +public class StatsCounter {
  23 + private final AtomicInteger aiCounter;
  24 + private final Counter micrometerCounter;
  25 + private final String name;
  26 +
  27 + public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
  28 + this.aiCounter = aiCounter;
  29 + this.micrometerCounter = micrometerCounter;
  30 + this.name = name;
  31 + }
  32 +
  33 + public void increment() {
  34 + aiCounter.incrementAndGet();
  35 + micrometerCounter.increment();
  36 + }
  37 +
  38 + public void clear() {
  39 + aiCounter.set(0);
  40 + }
  41 +
  42 + public int get() {
  43 + return aiCounter.get();
  44 + }
  45 +
  46 + public void add(int delta){
  47 + aiCounter.addAndGet(delta);
  48 + micrometerCounter.increment(delta);
  49 + }
  50 +
  51 + public String getName() {
  52 + return name;
  53 + }
  54 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.stats;
  17 +
  18 +import io.micrometer.core.instrument.Counter;
  19 +import io.micrometer.core.instrument.MeterRegistry;
  20 +import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.stereotype.Service;
  23 +import org.thingsboard.server.service.metrics.StubCounter;
  24 +
  25 +import java.util.concurrent.atomic.AtomicInteger;
  26 +
  27 +@Service
  28 +public class StatsCounterFactory {
  29 + private static final String STATS_NAME_TAG = "statsName";
  30 +
  31 + private static final Counter STUB_COUNTER = new StubCounter();
  32 +
  33 + @Autowired
  34 + private MeterRegistry meterRegistry;
  35 +
  36 + @Value("${metrics.enabled}")
  37 + private Boolean metricsEnabled;
  38 +
  39 + public StatsCounter createStatsCounter(String key, String statsName) {
  40 + return new StatsCounter(
  41 + new AtomicInteger(0),
  42 + metricsEnabled ?
  43 + meterRegistry.counter(key, STATS_NAME_TAG, statsName)
  44 + : STUB_COUNTER,
  45 + statsName
  46 + );
  47 + }
  48 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.stats;
  17 +
  18 +public enum StatsType {
  19 + RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
  20 +
  21 + private String name;
  22 +
  23 + StatsType(String name) {
  24 + this.name = name;
  25 + }
  26 +
  27 + public String getName() {
  28 + return name;
  29 + }
  30 +}
... ...
... ... @@ -29,6 +29,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM
29 29 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
30 30 import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
31 31 import org.thingsboard.server.queue.util.TbCoreComponent;
  32 +import org.thingsboard.server.service.stats.DefaultQueueStats;
  33 +import org.thingsboard.server.service.stats.StatsCounter;
  34 +import org.thingsboard.server.service.stats.StatsCounterFactory;
  35 +import org.thingsboard.server.service.stats.StatsType;
32 36
33 37 import javax.annotation.PostConstruct;
34 38 import javax.annotation.PreDestroy;
... ... @@ -41,9 +45,13 @@ import java.util.concurrent.*;
41 45 @Service
42 46 @TbCoreComponent
43 47 public class TbCoreTransportApiService {
  48 + private static final String TOTAL_MSGS = "totalMsgs";
  49 + private static final String SUCCESSFUL_MSGS = "successfulMsgs";
  50 + private static final String FAILED_MSGS = "failedMsgs";
44 51
45 52 private final TbCoreQueueFactory tbCoreQueueFactory;
46 53 private final TransportApiService transportApiService;
  54 + private final StatsCounterFactory counterFactory;
47 55
48 56 @Value("${queue.transport_api.max_pending_requests:10000}")
49 57 private int maxPendingRequests;
... ... @@ -58,9 +66,10 @@ public class TbCoreTransportApiService {
58 66 private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>,
59 67 TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate;
60 68
61   - public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService) {
  69 + public TbCoreTransportApiService(TbCoreQueueFactory tbCoreQueueFactory, TransportApiService transportApiService, StatsCounterFactory counterFactory) {
62 70 this.tbCoreQueueFactory = tbCoreQueueFactory;
63 71 this.transportApiService = transportApiService;
  72 + this.counterFactory = counterFactory;
64 73 }
65 74
66 75 @PostConstruct
... ... @@ -69,6 +78,12 @@ public class TbCoreTransportApiService {
69 78 TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueFactory.createTransportApiResponseProducer();
70 79 TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();
71 80
  81 + String key = StatsType.TRANSPORT.getName();
  82 + StatsCounter totalCounter = counterFactory.createStatsCounter(key, TOTAL_MSGS);
  83 + StatsCounter successfulCounter = counterFactory.createStatsCounter(key, SUCCESSFUL_MSGS);
  84 + StatsCounter failedCounter = counterFactory.createStatsCounter(key, FAILED_MSGS);
  85 + DefaultQueueStats queueStats = new DefaultQueueStats(totalCounter, successfulCounter, failedCounter);
  86 +
72 87 DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
73 88 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
74 89 builder.requestTemplate(consumer);
... ... @@ -78,6 +93,7 @@ public class TbCoreTransportApiService {
78 93 builder.pollInterval(responsePollDuration);
79 94 builder.executor(transportCallbackExecutor);
80 95 builder.handler(transportApiService);
  96 + builder.stats(queueStats);
81 97 transportApiTemplate = builder.build();
82 98 }
83 99
... ...
... ... @@ -27,7 +27,6 @@ import java.sql.Statement;
27 27
28 28
29 29 @Slf4j
30   -@PsqlDao
31 30 public abstract class AbstractCleanUpService {
32 31
33 32 @Value("${spring.datasource.url}")
... ...
... ... @@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21 21 import org.springframework.stereotype.Service;
22 22 import org.thingsboard.server.dao.util.PsqlDao;
  23 +import org.thingsboard.server.dao.util.SqlDao;
23 24 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
24 25
25 26 import java.sql.Connection;
... ... @@ -27,6 +28,7 @@ import java.sql.DriverManager;
27 28 import java.sql.SQLException;
28 29
29 30 @PsqlDao
  31 +@SqlDao
30 32 @Slf4j
31 33 @Service
32 34 public class EventsCleanUpService extends AbstractCleanUpService {
... ...
... ... @@ -18,14 +18,12 @@ package org.thingsboard.server.service.ttl.timeseries;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.beans.factory.annotation.Value;
20 20 import org.springframework.scheduling.annotation.Scheduled;
21   -import org.thingsboard.server.dao.util.PsqlTsAnyDao;
22 21 import org.thingsboard.server.service.ttl.AbstractCleanUpService;
23 22
24 23 import java.sql.Connection;
25 24 import java.sql.DriverManager;
26 25 import java.sql.SQLException;
27 26
28   -@PsqlTsAnyDao
29 27 @Slf4j
30 28 public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService {
31 29
... ...
... ... @@ -257,14 +257,17 @@ sql:
257 257 batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
258 258 batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
259 259 stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:10000}"
  260 + batch_threads: "${SQL_ATTRIBUTES_BATCH_THREADS:4}"
260 261 ts:
261 262 batch_size: "${SQL_TS_BATCH_SIZE:10000}"
262 263 batch_max_delay: "${SQL_TS_BATCH_MAX_DELAY_MS:100}"
263 264 stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}"
  265 + batch_threads: "${SQL_TS_BATCH_THREADS:4}"
264 266 ts_latest:
265 267 batch_size: "${SQL_TS_LATEST_BATCH_SIZE:10000}"
266 268 batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:100}"
267 269 stats_print_interval_ms: "${SQL_TS_LATEST_BATCH_STATS_PRINT_MS:10000}"
  270 + batch_threads: "${SQL_TS_LATEST_BATCH_THREADS:4}"
268 271 # Specify whether to remove null characters from strValue of attributes and timeseries before insert
269 272 remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
270 273 postgres:
... ... @@ -273,6 +276,7 @@ sql:
273 276 timescale:
274 277 # Specify Interval size for new data chunks storage.
275 278 chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
  279 + batch_threads: "${SQL_TIMESCALE_BATCH_THREADS:4}"
276 280 ttl:
277 281 ts:
278 282 enabled: "${SQL_TTL_TS_ENABLED:true}"
... ... @@ -451,7 +455,7 @@ spring:
451 455 username: "${SPRING_DATASOURCE_USERNAME:postgres}"
452 456 password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
453 457 hikari:
454   - maximumPoolSize: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:5}"
  458 + maximumPoolSize: "${SPRING_DATASOURCE_MAXIMUM_POOL_SIZE:16}"
455 459
456 460 # Audit log parameters
457 461 audit-log:
... ... @@ -760,3 +764,13 @@ service:
760 764 id: "${TB_SERVICE_ID:}"
761 765 tenant_id: "${TB_SERVICE_TENANT_ID:}" # empty or specific tenant id.
762 766
  767 +metrics:
  768 + # Enable/disable actuator metrics.
  769 + enabled: "${METRICS_ENABLED:false}"
  770 +
  771 +management:
  772 + endpoints:
  773 + web:
  774 + exposure:
  775 + # Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
  776 + include: '${METRICS_ENDPOINTS_EXPOSE:info}'
\ No newline at end of file
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.actors;
  17 +
  18 +public interface JsInvokeStats {
  19 + default void incrementRequests() {
  20 + incrementRequests(1);
  21 + }
  22 +
  23 + void incrementRequests(int amount);
  24 +
  25 + default void incrementResponses() {
  26 + incrementResponses(1);
  27 + }
  28 +
  29 + void incrementResponses(int amount);
  30 +
  31 + default void incrementFailures() {
  32 + incrementFailures(1);
  33 + }
  34 +
  35 + void incrementFailures(int amount);
  36 +
  37 + int getRequests();
  38 +
  39 + int getResponses();
  40 +
  41 + int getFailures();
  42 +
  43 + void reset();
  44 +}
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -17,5 +17,9 @@ package org.thingsboard.server.dao.util;
17 17
18 18 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
19 19
  20 +import java.lang.annotation.Retention;
  21 +import java.lang.annotation.RetentionPolicy;
  22 +
  23 +@Retention(RetentionPolicy.RUNTIME)
20 24 @ConditionalOnExpression("'${database.ts.type}'=='sql' && '${spring.jpa.database-platform}'=='org.hibernate.dialect.PostgreSQLDialect'")
21 25 public @interface PsqlTsDao { }
\ No newline at end of file
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>thingsboard</artifactId>
25 25 </parent>
26 26 <artifactId>common</artifactId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -91,8 +91,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
91 91 List<Response> responses = responseTemplate.poll(pollInterval);
92 92 if (responses.size() > 0) {
93 93 log.trace("Polling responses completed, consumer records count [{}]", responses.size());
94   - } else {
95   - continue;
96 94 }
97 95 responses.forEach(response -> {
98 96 byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
... ...
... ... @@ -23,6 +23,7 @@ import org.thingsboard.server.queue.TbQueueHandler;
23 23 import org.thingsboard.server.queue.TbQueueMsg;
24 24 import org.thingsboard.server.queue.TbQueueProducer;
25 25 import org.thingsboard.server.queue.TbQueueResponseTemplate;
  26 +import org.thingsboard.server.queue.stats.QueueStats;
26 27
27 28 import java.util.List;
28 29 import java.util.UUID;
... ... @@ -44,6 +45,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
44 45 private final ExecutorService loopExecutor;
45 46 private final ScheduledExecutorService timeoutExecutor;
46 47 private final ExecutorService callbackExecutor;
  48 + private final QueueStats stats;
47 49 private final int maxPendingRequests;
48 50 private final long requestTimeout;
49 51
... ... @@ -58,7 +60,8 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
58 60 long pollInterval,
59 61 long requestTimeout,
60 62 int maxPendingRequests,
61   - ExecutorService executor) {
  63 + ExecutorService executor,
  64 + QueueStats stats) {
62 65 this.requestTemplate = requestTemplate;
63 66 this.responseTemplate = responseTemplate;
64 67 this.pendingRequests = new ConcurrentHashMap<>();
... ... @@ -66,6 +69,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
66 69 this.pollInterval = pollInterval;
67 70 this.requestTimeout = requestTimeout;
68 71 this.callbackExecutor = executor;
  72 + this.stats = stats;
69 73 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
70 74 this.loopExecutor = Executors.newSingleThreadExecutor();
71 75 }
... ... @@ -108,11 +112,13 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
108 112 String responseTopic = bytesToString(responseTopicHeader);
109 113 try {
110 114 pendingRequestCount.getAndIncrement();
  115 + stats.incrementTotal();
111 116 AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
112 117 response -> {
113 118 pendingRequestCount.decrementAndGet();
114 119 response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
115 120 responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
  121 + stats.incrementSuccessful();
116 122 },
117 123 e -> {
118 124 pendingRequestCount.decrementAndGet();
... ... @@ -121,6 +127,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
121 127 } else {
122 128 log.trace("[{}] Failed to process the request: {}", requestId, request, e);
123 129 }
  130 + stats.incrementFailed();
124 131 },
125 132 requestTimeout,
126 133 timeoutExecutor,
... ... @@ -128,6 +135,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
128 135 } catch (Throwable e) {
129 136 pendingRequestCount.decrementAndGet();
130 137 log.warn("[{}] Failed to process the request: {}", requestId, request, e);
  138 + stats.incrementFailed();
131 139 }
132 140 }
133 141 });
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.stats;
  17 +
  18 +public interface QueueStats {
  19 + default void incrementTotal() {
  20 + incrementTotal(1);
  21 + }
  22 +
  23 + void incrementTotal(int amount);
  24 +
  25 + default void incrementSuccessful() {
  26 + incrementSuccessful(1);
  27 + }
  28 +
  29 + void incrementSuccessful(int amount);
  30 +
  31 +
  32 + default void incrementFailed() {
  33 + incrementFailed(1);
  34 + }
  35 +
  36 + void incrementFailed(int amount);
  37 +}
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard.common</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>transport</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common.transport</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard.common</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>transport</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common.transport</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard.common</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>transport</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common.transport</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard.common</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>transport</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common.transport</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>common</artifactId>
25 25 </parent>
26 26 <groupId>org.thingsboard.common</groupId>
... ...
... ... @@ -20,7 +20,7 @@
20 20 <modelVersion>4.0.0</modelVersion>
21 21 <parent>
22 22 <groupId>org.thingsboard</groupId>
23   - <version>2.5.3-SNAPSHOT</version>
  23 + <version>3.1.0-SNAPSHOT</version>
24 24 <artifactId>thingsboard</artifactId>
25 25 </parent>
26 26 <artifactId>dao</artifactId>
... ... @@ -91,26 +91,26 @@
91 91 <artifactId>mockito-all</artifactId>
92 92 <scope>test</scope>
93 93 </dependency>
94   - <dependency>
95   - <groupId>org.apache.commons</groupId>
96   - <artifactId>commons-lang3</artifactId>
97   - </dependency>
98   - <dependency>
99   - <groupId>commons-validator</groupId>
100   - <artifactId>commons-validator</artifactId>
101   - </dependency>
102   - <dependency>
103   - <groupId>com.fasterxml.jackson.core</groupId>
104   - <artifactId>jackson-databind</artifactId>
105   - </dependency>
  94 + <dependency>
  95 + <groupId>org.apache.commons</groupId>
  96 + <artifactId>commons-lang3</artifactId>
  97 + </dependency>
  98 + <dependency>
  99 + <groupId>commons-collections</groupId>
  100 + <artifactId>commons-collections</artifactId>
  101 + </dependency>
  102 + <dependency>
  103 + <groupId>com.fasterxml.jackson.core</groupId>
  104 + <artifactId>jackson-databind</artifactId>
  105 + </dependency>
106 106 <dependency>
107 107 <groupId>org.springframework</groupId>
108 108 <artifactId>spring-context</artifactId>
109 109 </dependency>
110   - <dependency>
111   - <groupId>org.springframework</groupId>
112   - <artifactId>spring-tx</artifactId>
113   - </dependency>
  110 + <dependency>
  111 + <groupId>org.springframework</groupId>
  112 + <artifactId>spring-tx</artifactId>
  113 + </dependency>
114 114 <dependency>
115 115 <groupId>org.springframework</groupId>
116 116 <artifactId>spring-web</artifactId>
... ... @@ -136,8 +136,8 @@
136 136 <groupId>io.takari.junit</groupId>
137 137 <artifactId>takari-cpsuite</artifactId>
138 138 <scope>test</scope>
139   - </dependency>
140   - <dependency>
  139 + </dependency>
  140 + <dependency>
141 141 <groupId>com.google.guava</groupId>
142 142 <artifactId>guava</artifactId>
143 143 </dependency>
... ... @@ -219,18 +219,18 @@
219 219 </includes>
220 220 </configuration>
221 221 </plugin>
222   - <plugin>
223   - <groupId>org.apache.maven.plugins</groupId>
224   - <artifactId>maven-jar-plugin</artifactId>
  222 + <plugin>
  223 + <groupId>org.apache.maven.plugins</groupId>
  224 + <artifactId>maven-jar-plugin</artifactId>
225 225 <version>${jar-plugin.version}</version>
226   - <executions>
227   - <execution>
228   - <goals>
229   - <goal>test-jar</goal>
230   - </goals>
231   - </execution>
232   - </executions>
233   - </plugin>
  226 + <executions>
  227 + <execution>
  228 + <goals>
  229 + <goal>test-jar</goal>
  230 + </goals>
  231 + </execution>
  232 + </executions>
  233 + </plugin>
234 234 </plugins>
235 235 </build>
236 236 </project>
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.service;
17 17
18 18 import com.fasterxml.jackson.databind.JsonNode;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.apache.commons.validator.routines.EmailValidator;
21 20 import org.thingsboard.server.common.data.BaseData;
22 21 import org.thingsboard.server.common.data.id.TenantId;
23 22 import org.thingsboard.server.dao.exception.DataValidationException;
... ... @@ -26,11 +25,13 @@ import java.util.HashSet;
26 25 import java.util.Iterator;
27 26 import java.util.Set;
28 27 import java.util.function.Function;
  28 +import java.util.regex.Matcher;
  29 +import java.util.regex.Pattern;
29 30
30 31 @Slf4j
31 32 public abstract class DataValidator<D extends BaseData<?>> {
32   -
33   - private static EmailValidator emailValidator = EmailValidator.getInstance();
  33 + private static final Pattern EMAIL_PATTERN =
  34 + Pattern.compile("^[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,}$", Pattern.CASE_INSENSITIVE);
34 35
35 36 public void validate(D data, Function<D, TenantId> tenantIdFunction) {
36 37 try {
... ... @@ -63,12 +64,21 @@ public abstract class DataValidator<D extends BaseData<?>> {
63 64 return actualData.getId() != null && existentData.getId().equals(actualData.getId());
64 65 }
65 66
66   - protected static void validateEmail(String email) {
67   - if (!emailValidator.isValid(email)) {
  67 + public static void validateEmail(String email) {
  68 + if (!doValidateEmail(email)) {
68 69 throw new DataValidationException("Invalid email address format '" + email + "'!");
69 70 }
70 71 }
71 72
  73 + private static boolean doValidateEmail(String email) {
  74 + if (email == null) {
  75 + return false;
  76 + }
  77 +
  78 + Matcher emailMatcher = EMAIL_PATTERN.matcher(email);
  79 + return emailMatcher.matches();
  80 + }
  81 +
72 82 protected static void validateJsonStructure(JsonNode expectedNode, JsonNode actualNode) {
73 83 Set<String> expectedFields = new HashSet<>();
74 84 Iterator<String> fieldsIterator = expectedNode.fieldNames();
... ...
... ... @@ -41,16 +41,14 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
41 41 private final TbSqlBlockingQueueParams params;
42 42
43 43 private ExecutorService executor;
44   - private ScheduledLogExecutorComponent logExecutor;
45 44
46 45 public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
47 46 this.params = params;
48 47 }
49 48
50 49 @Override
51   - public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
52   - this.logExecutor = logExecutor;
53   - executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + params.getLogName().toLowerCase()));
  50 + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int index) {
  51 + executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sql-queue-" + index + "-" + params.getLogName().toLowerCase()));
54 52 executor.submit(() -> {
55 53 String logName = params.getLogName();
56 54 int batchSize = params.getBatchSize();
... ... @@ -94,7 +92,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
94 92
95 93 logExecutor.scheduleAtFixedRate(() -> {
96 94 if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) {
97   - log.info("[{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]",
  95 + log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
98 96 params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
99 97 }
100 98 }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sql;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import lombok.Data;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +
  22 +import java.util.List;
  23 +import java.util.concurrent.CopyOnWriteArrayList;
  24 +import java.util.function.Consumer;
  25 +import java.util.function.Function;
  26 +
  27 +@Slf4j
  28 +@Data
  29 +public class TbSqlBlockingQueueWrapper<E> {
  30 + private final CopyOnWriteArrayList<TbSqlBlockingQueue<E>> queues = new CopyOnWriteArrayList<>();
  31 + private final TbSqlBlockingQueueParams params;
  32 + private ScheduledLogExecutorComponent logExecutor;
  33 + private final Function<E, Integer> hashCodeFunction;
  34 + private final int maxThreads;
  35 +
  36 + public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
  37 + for (int i = 0; i < maxThreads; i++) {
  38 + TbSqlBlockingQueue<E> queue = new TbSqlBlockingQueue<>(params);
  39 + queues.add(queue);
  40 + queue.init(logExecutor, saveFunction, i);
  41 + }
  42 + }
  43 +
  44 + public ListenableFuture<Void> add(E element) {
  45 + int queueIndex = element != null ? (hashCodeFunction.apply(element) & 0x7FFFFFFF) % maxThreads : 0;
  46 + return queues.get(queueIndex).add(element);
  47 + }
  48 +
  49 + public void destroy() {
  50 + queues.forEach(TbSqlBlockingQueue::destroy);
  51 + }
  52 +}
... ...
... ... @@ -22,7 +22,7 @@ import java.util.function.Consumer;
22 22
23 23 public interface TbSqlQueue<E> {
24 24
25   - void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction);
  25 + void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction, int queueIndex);
26 26
27 27 void destroy();
28 28
... ...
... ... @@ -31,8 +31,8 @@ import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
31 31 import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
32 32 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
33 33 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
34   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
35 34 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  35 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
36 36 import org.thingsboard.server.dao.util.SqlDao;
37 37
38 38 import javax.annotation.PostConstruct;
... ... @@ -40,6 +40,7 @@ import javax.annotation.PreDestroy;
40 40 import java.util.Collection;
41 41 import java.util.List;
42 42 import java.util.Optional;
  43 +import java.util.function.Function;
43 44 import java.util.stream.Collectors;
44 45
45 46 @Component
... ... @@ -65,7 +66,10 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
65 66 @Value("${sql.attributes.stats_print_interval_ms:1000}")
66 67 private long statsPrintIntervalMs;
67 68
68   - private TbSqlBlockingQueue<AttributeKvEntity> queue;
  69 + @Value("${sql.attributes.batch_threads:4}")
  70 + private int batchThreads;
  71 +
  72 + private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue;
69 73
70 74 @PostConstruct
71 75 private void init() {
... ... @@ -75,7 +79,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
75 79 .maxDelay(maxDelay)
76 80 .statsPrintIntervalMs(statsPrintIntervalMs)
77 81 .build();
78   - queue = new TbSqlBlockingQueue<>(params);
  82 +
  83 + Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
  84 + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads);
79 85 queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v));
80 86 }
81 87
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts;
17 17
18 18 import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
  20 +import com.google.common.util.concurrent.ListeningExecutorService;
20 21 import com.google.common.util.concurrent.MoreExecutors;
21 22 import com.google.common.util.concurrent.SettableFuture;
22 23 import lombok.extern.slf4j.Slf4j;
... ... @@ -31,8 +32,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
31 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
32 33 import org.thingsboard.server.dao.DaoUtil;
33 34 import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
34   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
35 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
36 37 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
37 38 import org.thingsboard.server.dao.sqlts.ts.TsKvRepository;
38 39 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
... ... @@ -43,6 +44,7 @@ import java.util.ArrayList;
43 44 import java.util.List;
44 45 import java.util.Optional;
45 46 import java.util.concurrent.CompletableFuture;
  47 +import java.util.function.Function;
46 48 import java.util.stream.Collectors;
47 49
48 50 @Slf4j
... ... @@ -54,7 +56,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
54 56 @Autowired
55 57 protected InsertTsRepository<TsKvEntity> insertRepository;
56 58
57   - protected TbSqlBlockingQueue<TsKvEntity> tsQueue;
  59 + protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
58 60
59 61 @PostConstruct
60 62 protected void init() {
... ... @@ -65,7 +67,9 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
65 67 .maxDelay(tsMaxDelay)
66 68 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
67 69 .build();
68   - tsQueue = new TbSqlBlockingQueue<>(tsParams);
  70 +
  71 + Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
  72 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, tsBatchThreads);
69 73 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
70 74 }
71 75
... ...
... ... @@ -41,8 +41,8 @@ import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey;
41 41 import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
42 42 import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
43 43 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
44   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
45 44 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  45 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
46 46 import org.thingsboard.server.dao.sqlts.dictionary.TsKvDictionaryRepository;
47 47 import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
48 48 import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
... ... @@ -52,7 +52,10 @@ import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
52 52 import javax.annotation.Nullable;
53 53 import javax.annotation.PostConstruct;
54 54 import javax.annotation.PreDestroy;
  55 +import java.util.ArrayList;
  56 +import java.util.HashMap;
55 57 import java.util.List;
  58 +import java.util.Map;
56 59 import java.util.Objects;
57 60 import java.util.Optional;
58 61 import java.util.concurrent.ConcurrentHashMap;
... ... @@ -82,7 +85,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
82 85 @Autowired
83 86 private TsKvDictionaryRepository dictionaryRepository;
84 87
85   - private TbSqlBlockingQueue<TsKvLatestEntity> tsLatestQueue;
  88 + private TbSqlBlockingQueueWrapper<TsKvLatestEntity> tsLatestQueue;
86 89
87 90 @Value("${sql.ts_latest.batch_size:1000}")
88 91 private int tsLatestBatchSize;
... ... @@ -93,6 +96,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
93 96 @Value("${sql.ts_latest.stats_print_interval_ms:1000}")
94 97 private long tsLatestStatsPrintIntervalMs;
95 98
  99 + @Value("${sql.ts_latest.batch_threads:4}")
  100 + private int tsLatestBatchThreads;
  101 +
96 102 @Autowired
97 103 protected ScheduledLogExecutorComponent logExecutor;
98 104
... ... @@ -105,6 +111,12 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
105 111 @Value("${sql.ts.stats_print_interval_ms:1000}")
106 112 protected long tsStatsPrintIntervalMs;
107 113
  114 + @Value("${sql.ts.batch_threads:4}")
  115 + protected int tsBatchThreads;
  116 +
  117 + @Value("${sql.timescale.batch_threads:4}")
  118 + protected int timescaleBatchThreads;
  119 +
108 120 @PostConstruct
109 121 protected void init() {
110 122 TbSqlBlockingQueueParams tsLatestParams = TbSqlBlockingQueueParams.builder()
... ... @@ -113,8 +125,22 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
113 125 .maxDelay(tsLatestMaxDelay)
114 126 .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
115 127 .build();
116   - tsLatestQueue = new TbSqlBlockingQueue<>(tsLatestParams);
117   - tsLatestQueue.init(logExecutor, v -> insertLatestTsRepository.saveOrUpdate(v));
  128 +
  129 + java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
  130 + tsLatestQueue = new TbSqlBlockingQueueWrapper<>(tsLatestParams, hashcodeFunction, tsLatestBatchThreads);
  131 +
  132 + tsLatestQueue.init(logExecutor, v -> {
  133 + Map<TsKey, TsKvLatestEntity> trueLatest = new HashMap<>();
  134 + v.forEach(ts -> {
  135 + TsKey key = new TsKey(ts.getEntityId(), ts.getKey());
  136 + TsKvLatestEntity old = trueLatest.get(key);
  137 + if (old == null || old.getTs() < ts.getTs()) {
  138 + trueLatest.put(key, ts);
  139 + }
  140 + });
  141 + List<TsKvLatestEntity> latestEntities = new ArrayList<>(trueLatest.values());
  142 + insertLatestTsRepository.saveOrUpdate(latestEntities);
  143 + });
118 144 }
119 145
120 146 @PreDestroy
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.sqlts;
  17 +
  18 +import lombok.Data;
  19 +
  20 +import java.util.UUID;
  21 +
  22 +@Data
  23 +public class TsKey {
  24 + private final UUID entityId;
  25 + private final int key;
  26 +}
... ...
... ... @@ -33,8 +33,8 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
33 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
34 34 import org.thingsboard.server.dao.DaoUtil;
35 35 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
36   -import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
37 36 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  37 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
38 38 import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
39 39 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
40 40 import org.thingsboard.server.dao.timeseries.TimeseriesDao;
... ... @@ -48,6 +48,7 @@ import java.util.List;
48 48 import java.util.Optional;
49 49 import java.util.UUID;
50 50 import java.util.concurrent.CompletableFuture;
  51 +import java.util.function.Function;
51 52
52 53 @Component
53 54 @Slf4j
... ... @@ -63,7 +64,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
63 64 @Autowired
64 65 protected InsertTsRepository<TimescaleTsKvEntity> insertRepository;
65 66
66   - protected TbSqlBlockingQueue<TimescaleTsKvEntity> tsQueue;
  67 + protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue;
67 68
68 69 @PostConstruct
69 70 protected void init() {
... ... @@ -74,7 +75,10 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
74 75 .maxDelay(tsMaxDelay)
75 76 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
76 77 .build();
77   - tsQueue = new TbSqlBlockingQueue<>(tsParams);
  78 +
  79 + Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
  80 + tsQueue = new TbSqlBlockingQueueWrapper<>(tsParams, hashcodeFunction, timescaleBatchThreads);
  81 +
78 82 tsQueue.init(logExecutor, v -> insertRepository.saveOrUpdate(v));
79 83 }
80 84
... ... @@ -277,4 +281,5 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
277 281 startTs,
278 282 endTs);
279 283 }
  284 +
280 285 }
... ...
... ... @@ -21,7 +21,7 @@
21 21
22 22 <parent>
23 23 <groupId>org.thingsboard</groupId>
24   - <version>2.5.3-SNAPSHOT</version>
  24 + <version>3.1.0-SNAPSHOT</version>
25 25 <artifactId>msa</artifactId>
26 26 </parent>
27 27 <groupId>org.thingsboard.msa</groupId>
... ...