Commit d98bbacd43de4d0d9b08f751bd638be94f1859ac

Authored by vzikratyi
Committed by Andrew Shvayka
1 parent 1700cf77

Added stats to TbSqlQueue and BufferedRateExecutor

  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.msg.stats;
  17 +
  18 +import io.micrometer.core.instrument.Counter;
  19 +
  20 +import java.util.concurrent.atomic.AtomicInteger;
  21 +
  22 +public class DefaultCounter {
  23 + private final AtomicInteger aiCounter;
  24 + private final Counter micrometerCounter;
  25 +
  26 + public DefaultCounter(AtomicInteger aiCounter, Counter micrometerCounter) {
  27 + this.aiCounter = aiCounter;
  28 + this.micrometerCounter = micrometerCounter;
  29 + }
  30 +
  31 + public void increment() {
  32 + aiCounter.incrementAndGet();
  33 + micrometerCounter.increment();
  34 + }
  35 +
  36 + public void clear() {
  37 + aiCounter.set(0);
  38 + }
  39 +
  40 + public int get() {
  41 + return aiCounter.get();
  42 + }
  43 +
  44 + public void add(int delta){
  45 + aiCounter.addAndGet(delta);
  46 + micrometerCounter.increment(delta);
  47 + }
  48 +}
... ...
... ... @@ -40,4 +40,26 @@ public class DefaultMessagesStats implements MessagesStats {
40 40 public void incrementFailed(int amount) {
41 41 failedCounter.add(amount);
42 42 }
  43 +
  44 + @Override
  45 + public int getTotal() {
  46 + return totalCounter.get();
  47 + }
  48 +
  49 + @Override
  50 + public int getSuccessful() {
  51 + return successfulCounter.get();
  52 + }
  53 +
  54 + @Override
  55 + public int getFailed() {
  56 + return failedCounter.get();
  57 + }
  58 +
  59 + @Override
  60 + public void reset() {
  61 + totalCounter.clear();
  62 + successfulCounter.clear();
  63 + failedCounter.clear();
  64 + }
43 65 }
... ...
... ... @@ -17,6 +17,7 @@ package org.thingsboard.server.common.msg.stats;
17 17
18 18 import io.micrometer.core.instrument.Counter;
19 19 import io.micrometer.core.instrument.MeterRegistry;
  20 +import io.micrometer.core.instrument.Tags;
20 21 import org.springframework.beans.factory.annotation.Autowired;
21 22 import org.springframework.beans.factory.annotation.Value;
22 23 import org.springframework.stereotype.Service;
... ... @@ -51,6 +52,21 @@ public class DefaultStatsFactory implements StatsFactory {
51 52 }
52 53
53 54 @Override
  55 + public DefaultCounter createDefaultCounter(String key, String... tags) {
  56 + return new DefaultCounter(
  57 + new AtomicInteger(0),
  58 + metricsEnabled ?
  59 + meterRegistry.counter(key, tags)
  60 + : STUB_COUNTER
  61 + );
  62 + }
  63 +
  64 + @Override
  65 + public <T extends Number> T createGauge(String key, T number, String... tags) {
  66 + return meterRegistry.gauge(key, Tags.of(tags), number);
  67 + }
  68 +
  69 + @Override
54 70 public MessagesStats createMessagesStats(String key) {
55 71 StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
56 72 StatsCounter successfulCounter = createStatsCounter(key, SUCCESSFUL_MSGS);
... ...
... ... @@ -28,10 +28,17 @@ public interface MessagesStats {
28 28
29 29 void incrementSuccessful(int amount);
30 30
31   -
32 31 default void incrementFailed() {
33 32 incrementFailed(1);
34 33 }
35 34
36 35 void incrementFailed(int amount);
  36 +
  37 + int getTotal();
  38 +
  39 + int getSuccessful();
  40 +
  41 + int getFailed();
  42 +
  43 + void reset();
37 44 }
... ...
... ... @@ -19,35 +19,14 @@ import io.micrometer.core.instrument.Counter;
19 19
20 20 import java.util.concurrent.atomic.AtomicInteger;
21 21
22   -public class StatsCounter {
23   - private final AtomicInteger aiCounter;
24   - private final Counter micrometerCounter;
  22 +public class StatsCounter extends DefaultCounter {
25 23 private final String name;
26 24
27 25 public StatsCounter(AtomicInteger aiCounter, Counter micrometerCounter, String name) {
28   - this.aiCounter = aiCounter;
29   - this.micrometerCounter = micrometerCounter;
  26 + super(aiCounter, micrometerCounter);
30 27 this.name = name;
31 28 }
32 29
33   - public void increment() {
34   - aiCounter.incrementAndGet();
35   - micrometerCounter.increment();
36   - }
37   -
38   - public void clear() {
39   - aiCounter.set(0);
40   - }
41   -
42   - public int get() {
43   - return aiCounter.get();
44   - }
45   -
46   - public void add(int delta){
47   - aiCounter.addAndGet(delta);
48   - micrometerCounter.increment(delta);
49   - }
50   -
51 30 public String getName() {
52 31 return name;
53 32 }
... ...
... ... @@ -18,5 +18,9 @@ package org.thingsboard.server.common.msg.stats;
18 18 public interface StatsFactory {
19 19 StatsCounter createStatsCounter(String key, String statsName);
20 20
  21 + DefaultCounter createDefaultCounter(String key, String... tags);
  22 +
  23 + <T extends Number> T createGauge(String key, T number, String... tags);
  24 +
21 25 MessagesStats createMessagesStats(String key);
22 26 }
... ...
... ... @@ -16,7 +16,7 @@
16 16 package org.thingsboard.server.common.msg.stats;
17 17
18 18 public enum StatsType {
19   - RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke");
  19 + RULE_ENGINE("ruleEngine"), CORE("core"), TRANSPORT("transport"), JS_INVOKE("jsInvoke"), RATE_EXECUTOR("rateExecutor");
20 20
21 21 private String name;
22 22
... ...
... ... @@ -24,6 +24,9 @@ import org.springframework.beans.factory.annotation.Value;
24 24 import org.springframework.scheduling.annotation.Scheduled;
25 25 import org.springframework.stereotype.Component;
26 26 import org.thingsboard.server.common.data.id.TenantId;
  27 +import org.thingsboard.server.common.msg.stats.DefaultCounter;
  28 +import org.thingsboard.server.common.msg.stats.StatsCounter;
  29 +import org.thingsboard.server.common.msg.stats.StatsFactory;
27 30 import org.thingsboard.server.dao.entity.EntityService;
28 31 import org.thingsboard.server.dao.util.AbstractBufferedRateExecutor;
29 32 import org.thingsboard.server.dao.util.AsyncTaskContext;
... ... @@ -57,48 +60,58 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
57 60 @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
58 61 @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
59 62 @Value("${cassandra.query.tenant_rate_limits.print_tenant_names}") boolean printTenantNames,
60   - @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq) {
61   - super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq);
  63 + @Value("${cassandra.query.print_queries_freq:0}") int printQueriesFreq,
  64 + @Autowired StatsFactory statsFactory) {
  65 + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs, tenantRateLimitsEnabled, tenantRateLimitsConfiguration, printQueriesFreq, statsFactory);
62 66 this.printTenantNames = printTenantNames;
63 67 }
64 68
65 69 @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
66 70 public void printStats() {
67 71 int queueSize = getQueueSize();
68   - int totalAddedValue = totalAdded.getAndSet(0);
69   - int totalLaunchedValue = totalLaunched.getAndSet(0);
70   - int totalReleasedValue = totalReleased.getAndSet(0);
71   - int totalFailedValue = totalFailed.getAndSet(0);
72   - int totalExpiredValue = totalExpired.getAndSet(0);
73   - int totalRejectedValue = totalRejected.getAndSet(0);
74   - int totalRateLimitedValue = totalRateLimited.getAndSet(0);
75   - int rateLimitedTenantsValue = rateLimitedTenants.size();
76   - int concurrencyLevelValue = concurrencyLevel.get();
77   - if (queueSize > 0 || totalAddedValue > 0 || totalLaunchedValue > 0 || totalReleasedValue > 0 ||
78   - totalFailedValue > 0 || totalExpiredValue > 0 || totalRejectedValue > 0 || totalRateLimitedValue > 0 || rateLimitedTenantsValue > 0
79   - || concurrencyLevelValue > 0) {
80   - log.info("Permits queueSize [{}] totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] " +
81   - "totalRateLimited [{}] totalRateLimitedTenants [{}] currBuffer [{}] ",
82   - queueSize, totalAddedValue, totalLaunchedValue, totalReleasedValue,
83   - totalFailedValue, totalExpiredValue, totalRejectedValue, totalRateLimitedValue, rateLimitedTenantsValue, concurrencyLevelValue);
  72 + int rateLimitedTenantsCount = (int) stats.getRateLimitedTenants().values().stream()
  73 + .filter(defaultCounter -> defaultCounter.get() > 0)
  74 + .count();
  75 +
  76 + if (queueSize > 0
  77 + || rateLimitedTenantsCount > 0
  78 + || concurrencyLevel.get() > 0
  79 + || stats.getStatsCounters().stream().anyMatch(counter -> counter.get() > 0)
  80 + ) {
  81 + StringBuilder statsBuilder = new StringBuilder();
  82 +
  83 + statsBuilder.append("queueSize").append(" = [").append(queueSize).append("] ");
  84 + stats.getStatsCounters().forEach(counter -> {
  85 + statsBuilder.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
  86 + });
  87 + statsBuilder.append("totalRateLimitedTenants").append(" = [").append(rateLimitedTenantsCount).append("] ");
  88 + statsBuilder.append(CONCURRENCY_LEVEL).append(" = [").append(concurrencyLevel.get()).append("] ");
  89 +
  90 + stats.getStatsCounters().forEach(StatsCounter::clear);
  91 + log.info("Permits {}", statsBuilder);
84 92 }
85 93
86   - rateLimitedTenants.forEach(((tenantId, counter) -> {
87   - if (printTenantNames) {
88   - String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
89   - try {
90   - return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
91   - } catch (Exception e) {
92   - log.error("[{}] Failed to get tenant name", tenantId, e);
93   - return "N/A";
  94 + stats.getRateLimitedTenants().entrySet().stream()
  95 + .filter(entry -> entry.getValue().get() > 0)
  96 + .forEach(entry -> {
  97 + TenantId tenantId = entry.getKey();
  98 + DefaultCounter counter = entry.getValue();
  99 + int rateLimitedRequests = counter.get();
  100 + counter.clear();
  101 + if (printTenantNames) {
  102 + String name = tenantNamesCache.computeIfAbsent(tenantId, tId -> {
  103 + try {
  104 + return entityService.fetchEntityNameAsync(TenantId.SYS_TENANT_ID, tenantId).get();
  105 + } catch (Exception e) {
  106 + log.error("[{}] Failed to get tenant name", tenantId, e);
  107 + return "N/A";
  108 + }
  109 + });
  110 + log.info("[{}][{}] Rate limited requests: {}", tenantId, name, rateLimitedRequests);
  111 + } else {
  112 + log.info("[{}] Rate limited requests: {}", tenantId, rateLimitedRequests);
94 113 }
95 114 });
96   - log.info("[{}][{}] Rate limited requests: {}", tenantId, name, counter);
97   - } else {
98   - log.info("[{}] Rate limited requests: {}", tenantId, counter);
99   - }
100   - }));
101   - rateLimitedTenants.clear();
102 115 }
103 116
104 117 @PreDestroy
... ...
... ... @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
19 19 import com.google.common.util.concurrent.SettableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.thingsboard.common.util.ThingsBoardThreadFactory;
  22 +import org.thingsboard.server.common.msg.stats.MessagesStats;
22 23
23 24 import java.util.ArrayList;
24 25 import java.util.List;
... ... @@ -35,12 +36,10 @@ import java.util.stream.Collectors;
35 36 public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
36 37
37 38 private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
38   - private final AtomicInteger addedCount = new AtomicInteger();
39   - private final AtomicInteger savedCount = new AtomicInteger();
40   - private final AtomicInteger failedCount = new AtomicInteger();
41 39 private final TbSqlBlockingQueueParams params;
42 40
43 41 private ExecutorService executor;
  42 + private MessagesStats stats;
44 43
45 44 public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
46 45 this.params = params;
... ... @@ -68,7 +67,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
68 67 log.debug("[{}] Going to save {} entities", logName, entities.size());
69 68 saveFunction.accept(entities.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList()));
70 69 entities.forEach(v -> v.getFuture().set(null));
71   - savedCount.addAndGet(entities.size());
  70 + stats.incrementSuccessful(entities.size());
72 71 if (!fullPack) {
73 72 long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
74 73 if (remainingDelay > 0) {
... ... @@ -76,7 +75,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
76 75 }
77 76 }
78 77 } catch (Exception e) {
79   - failedCount.addAndGet(entities.size());
  78 + stats.incrementFailed(entities.size());
80 79 entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
81 80 if (e instanceof InterruptedException) {
82 81 log.info("[{}] Queue polling was interrupted", logName);
... ... @@ -91,9 +90,10 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
91 90 });
92 91
93 92 logExecutor.scheduleAtFixedRate(() -> {
94   - if (queue.size() > 0 || addedCount.get() > 0 || savedCount.get() > 0 || failedCount.get() > 0) {
  93 + if (queue.size() > 0 || stats.getTotal() > 0 || stats.getSuccessful() > 0 || stats.getFailed() > 0) {
95 94 log.info("Queue-{} [{}] queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", index,
96   - params.getLogName(), queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
  95 + params.getLogName(), queue.size(), stats.getTotal(), stats.getSuccessful(), stats.getFailed());
  96 + stats.reset();
97 97 }
98 98 }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
99 99 }
... ... @@ -109,7 +109,7 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
109 109 public ListenableFuture<Void> add(E element) {
110 110 SettableFuture<Void> future = SettableFuture.create();
111 111 queue.add(new TbSqlQueueElement<>(future, element));
112   - addedCount.incrementAndGet();
  112 + params.getStats().incrementTotal();
113 113 return future;
114 114 }
115 115 }
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql;
18 18 import lombok.Builder;
19 19 import lombok.Data;
20 20 import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.common.msg.stats.MessagesStats;
21 22
22 23 @Slf4j
23 24 @Data
... ... @@ -28,4 +29,5 @@ public class TbSqlBlockingQueueParams {
28 29 private final int batchSize;
29 30 private final long maxDelay;
30 31 private final long statsPrintIntervalMs;
  32 + private final MessagesStats stats;
31 33 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.UUIDConverter;
26 26 import org.thingsboard.server.common.data.id.EntityId;
27 27 import org.thingsboard.server.common.data.id.TenantId;
28 28 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  29 +import org.thingsboard.server.common.msg.stats.StatsFactory;
29 30 import org.thingsboard.server.dao.DaoUtil;
30 31 import org.thingsboard.server.dao.attributes.AttributesDao;
31 32 import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
... ... @@ -60,6 +61,9 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
60 61 @Autowired
61 62 private AttributeKvInsertRepository attributeKvInsertRepository;
62 63
  64 + @Autowired
  65 + private StatsFactory statsFactory;
  66 +
63 67 @Value("${sql.attributes.batch_size:1000}")
64 68 private int batchSize;
65 69
... ... @@ -81,6 +85,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
81 85 .batchSize(batchSize)
82 86 .maxDelay(maxDelay)
83 87 .statsPrintIntervalMs(statsPrintIntervalMs)
  88 + .stats(statsFactory.createMessagesStats("attributes"))
84 89 .build();
85 90
86 91 Function<AttributeKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
... ...
... ... @@ -30,8 +30,10 @@ import org.thingsboard.server.common.data.kv.Aggregation;
30 30 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
31 31 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
32 32 import org.thingsboard.server.common.data.kv.TsKvEntry;
  33 +import org.thingsboard.server.common.msg.stats.StatsFactory;
33 34 import org.thingsboard.server.dao.DaoUtil;
34 35 import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
35 37 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
36 38 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
37 39 import org.thingsboard.server.dao.sqlts.insert.InsertTsRepository;
... ... @@ -44,7 +46,6 @@ import java.util.ArrayList;
44 46 import java.util.List;
45 47 import java.util.Optional;
46 48 import java.util.concurrent.CompletableFuture;
47   -import java.util.function.Function;
48 49 import java.util.stream.Collectors;
49 50
50 51 @Slf4j
... ... @@ -57,6 +58,8 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
57 58 protected InsertTsRepository<TsKvEntity> insertRepository;
58 59
59 60 protected TbSqlBlockingQueueWrapper<TsKvEntity> tsQueue;
  61 + @Autowired
  62 + private StatsFactory statsFactory;
60 63
61 64 @PostConstruct
62 65 protected void init() {
... ... @@ -66,6 +69,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
66 69 .batchSize(tsBatchSize)
67 70 .maxDelay(tsMaxDelay)
68 71 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
  72 + .stats(statsFactory.createMessagesStats("ts"))
69 73 .build();
70 74
71 75 Function<TsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
... ...
... ... @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
34 34 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
35 35 import org.thingsboard.server.common.data.kv.StringDataEntry;
36 36 import org.thingsboard.server.common.data.kv.TsKvEntry;
  37 +import org.thingsboard.server.common.msg.stats.StatsFactory;
37 38 import org.thingsboard.server.dao.DaoUtil;
38 39 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionary;
39 40 import org.thingsboard.server.dao.model.sqlts.dictionary.TsKvDictionaryCompositeKey;
... ... @@ -102,6 +103,9 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
102 103 @Autowired
103 104 protected ScheduledLogExecutorComponent logExecutor;
104 105
  106 + @Autowired
  107 + private StatsFactory statsFactory;
  108 +
105 109 @Value("${sql.ts.batch_size:1000}")
106 110 protected int tsBatchSize;
107 111
... ... @@ -124,6 +128,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
124 128 .batchSize(tsLatestBatchSize)
125 129 .maxDelay(tsLatestMaxDelay)
126 130 .statsPrintIntervalMs(tsLatestStatsPrintIntervalMs)
  131 + .stats(statsFactory.createMessagesStats("ts.latest"))
127 132 .build();
128 133
129 134 java.util.function.Function<TsKvLatestEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
... ...
... ... @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
31 31 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
32 32 import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
33 33 import org.thingsboard.server.common.data.kv.TsKvEntry;
  34 +import org.thingsboard.server.common.msg.stats.StatsFactory;
34 35 import org.thingsboard.server.dao.DaoUtil;
35 36 import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
36 37 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
... ... @@ -62,6 +63,9 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
62 63 private AggregationRepository aggregationRepository;
63 64
64 65 @Autowired
  66 + private StatsFactory statsFactory;
  67 +
  68 + @Autowired
65 69 protected InsertTsRepository<TimescaleTsKvEntity> insertRepository;
66 70
67 71 protected TbSqlBlockingQueueWrapper<TimescaleTsKvEntity> tsQueue;
... ... @@ -74,6 +78,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
74 78 .batchSize(tsBatchSize)
75 79 .maxDelay(tsMaxDelay)
76 80 .statsPrintIntervalMs(tsStatsPrintIntervalMs)
  81 + .stats(statsFactory.createMessagesStats("ts.timescale"))
77 82 .build();
78 83
79 84 Function<TimescaleTsKvEntity, Integer> hashcodeFunction = entity -> entity.getEntityId().hashCode();
... ...
... ... @@ -23,10 +23,16 @@ import com.google.common.util.concurrent.SettableFuture;
23 23 import lombok.extern.slf4j.Slf4j;
24 24 import org.thingsboard.common.util.ThingsBoardThreadFactory;
25 25 import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.msg.stats.DefaultCounter;
  27 +import org.thingsboard.server.common.msg.stats.StatsCounter;
  28 +import org.thingsboard.server.common.msg.stats.StatsFactory;
  29 +import org.thingsboard.server.common.msg.stats.StatsType;
26 30 import org.thingsboard.server.common.msg.tools.TbRateLimits;
27 31 import org.thingsboard.server.dao.nosql.CassandraStatementTask;
28 32
29 33 import javax.annotation.Nullable;
  34 +import java.util.ArrayList;
  35 +import java.util.List;
30 36 import java.util.UUID;
31 37 import java.util.concurrent.*;
32 38 import java.util.concurrent.atomic.AtomicInteger;
... ... @@ -38,6 +44,8 @@ import java.util.regex.Matcher;
38 44 @Slf4j
39 45 public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
40 46
  47 + public static final String CONCURRENCY_LEVEL = "currBuffer";
  48 +
41 49 private final long maxWaitTime;
42 50 private final long pollMs;
43 51 private final BlockingQueue<AsyncTaskContext<T, V>> queue;
... ... @@ -49,20 +57,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
49 57 private final boolean perTenantLimitsEnabled;
50 58 private final String perTenantLimitsConfiguration;
51 59 private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<>();
52   - protected final ConcurrentMap<TenantId, AtomicInteger> rateLimitedTenants = new ConcurrentHashMap<>();
53   -
54   - protected final AtomicInteger concurrencyLevel = new AtomicInteger();
55   - protected final AtomicInteger totalAdded = new AtomicInteger();
56   - protected final AtomicInteger totalLaunched = new AtomicInteger();
57   - protected final AtomicInteger totalReleased = new AtomicInteger();
58   - protected final AtomicInteger totalFailed = new AtomicInteger();
59   - protected final AtomicInteger totalExpired = new AtomicInteger();
60   - protected final AtomicInteger totalRejected = new AtomicInteger();
61   - protected final AtomicInteger totalRateLimited = new AtomicInteger();
62   - protected final AtomicInteger printQueriesIdx = new AtomicInteger();
  60 +
  61 + private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
  62 +
  63 + protected final AtomicInteger concurrencyLevel;
  64 + protected final BufferedRateExecutorStats stats;
63 65
64 66 public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs,
65   - boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq) {
  67 + boolean perTenantLimitsEnabled, String perTenantLimitsConfiguration, int printQueriesFreq, StatsFactory statsFactory) {
66 68 this.maxWaitTime = maxWaitTime;
67 69 this.pollMs = pollMs;
68 70 this.concurrencyLimit = concurrencyLimit;
... ... @@ -73,6 +75,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
73 75 this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-timeout"));
74 76 this.perTenantLimitsEnabled = perTenantLimitsEnabled;
75 77 this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
  78 + this.stats = new BufferedRateExecutorStats(statsFactory);
  79 + String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL;
  80 + this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));
  81 +
76 82 for (int i = 0; i < dispatcherThreads; i++) {
77 83 dispatcherExecutor.submit(this::dispatch);
78 84 }
... ... @@ -89,8 +95,8 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
89 95 } else if (!task.getTenantId().isNullUid()) {
90 96 TbRateLimits rateLimits = perTenantLimits.computeIfAbsent(task.getTenantId(), id -> new TbRateLimits(perTenantLimitsConfiguration));
91 97 if (!rateLimits.tryConsume()) {
92   - rateLimitedTenants.computeIfAbsent(task.getTenantId(), tId -> new AtomicInteger(0)).incrementAndGet();
93   - totalRateLimited.incrementAndGet();
  98 + stats.incrementRateLimitedTenant(task.getTenantId());
  99 + stats.getTotalRateLimited().increment();
94 100 settableFuture.setException(new TenantRateLimitException());
95 101 perTenantLimitReached = true;
96 102 }
... ... @@ -98,10 +104,10 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
98 104 }
99 105 if (!perTenantLimitReached) {
100 106 try {
101   - totalAdded.incrementAndGet();
  107 + stats.getTotalAdded().increment();
102 108 queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis()));
103 109 } catch (IllegalStateException e) {
104   - totalRejected.incrementAndGet();
  110 + stats.getTotalRejected().increment();
105 111 settableFuture.setException(e);
106 112 }
107 113 }
... ... @@ -146,14 +152,14 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
146 152 concurrencyLevel.incrementAndGet();
147 153 long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis();
148 154 if (timeout > 0) {
149   - totalLaunched.incrementAndGet();
  155 + stats.getTotalLaunched().increment();
150 156 ListenableFuture<V> result = execute(finalTaskCtx);
151 157 result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor);
152 158 Futures.addCallback(result, new FutureCallback<V>() {
153 159 @Override
154 160 public void onSuccess(@Nullable V result) {
155 161 logTask("Releasing", finalTaskCtx);
156   - totalReleased.incrementAndGet();
  162 + stats.getTotalReleased().increment();
157 163 concurrencyLevel.decrementAndGet();
158 164 finalTaskCtx.getFuture().set(result);
159 165 }
... ... @@ -165,7 +171,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
165 171 } else {
166 172 logTask("Failed", finalTaskCtx);
167 173 }
168   - totalFailed.incrementAndGet();
  174 + stats.getTotalFailed().increment();
169 175 concurrencyLevel.decrementAndGet();
170 176 finalTaskCtx.getFuture().setException(t);
171 177 log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t);
... ... @@ -173,7 +179,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
173 179 }, callbackExecutor);
174 180 } else {
175 181 logTask("Expired Before Execution", finalTaskCtx);
176   - totalExpired.incrementAndGet();
  182 + stats.getTotalExpired().increment();
177 183 concurrencyLevel.decrementAndGet();
178 184 taskCtx.getFuture().setException(new TimeoutException());
179 185 }
... ... @@ -185,7 +191,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
185 191 } catch (Throwable e) {
186 192 if (taskCtx != null) {
187 193 log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e);
188   - totalFailed.incrementAndGet();
  194 + stats.getTotalFailed().increment();
189 195 concurrencyLevel.decrementAndGet();
190 196 } else {
191 197 log.debug("Failed to queue task:", e);
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.util;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.data.id.TenantId;
  21 +import org.thingsboard.server.common.msg.stats.DefaultCounter;
  22 +import org.thingsboard.server.common.msg.stats.StatsCounter;
  23 +import org.thingsboard.server.common.msg.stats.StatsFactory;
  24 +import org.thingsboard.server.common.msg.stats.StatsType;
  25 +
  26 +import java.util.ArrayList;
  27 +import java.util.List;
  28 +import java.util.concurrent.ConcurrentHashMap;
  29 +import java.util.concurrent.ConcurrentMap;
  30 +import java.util.concurrent.atomic.AtomicInteger;
  31 +
  32 +@Slf4j
  33 +@Getter
  34 +public class BufferedRateExecutorStats {
  35 + private static final String TENANT_ID_TAG = "tenantId";
  36 +
  37 +
  38 + private static final String TOTAL_ADDED = "totalAdded";
  39 + private static final String TOTAL_LAUNCHED = "totalLaunched";
  40 + private static final String TOTAL_RELEASED = "totalReleased";
  41 + private static final String TOTAL_FAILED = "totalFailed";
  42 + private static final String TOTAL_EXPIRED = "totalExpired";
  43 + private static final String TOTAL_REJECTED = "totalRejected";
  44 + private static final String TOTAL_RATE_LIMITED = "totalRateLimited";
  45 +
  46 + private final StatsFactory statsFactory;
  47 +
  48 + private final ConcurrentMap<TenantId, DefaultCounter> rateLimitedTenants = new ConcurrentHashMap<>();
  49 +
  50 + private final List<StatsCounter> statsCounters = new ArrayList<>();
  51 +
  52 + private final StatsCounter totalAdded;
  53 + private final StatsCounter totalLaunched;
  54 + private final StatsCounter totalReleased;
  55 + private final StatsCounter totalFailed;
  56 + private final StatsCounter totalExpired;
  57 + private final StatsCounter totalRejected;
  58 + private final StatsCounter totalRateLimited;
  59 +
  60 + public BufferedRateExecutorStats(StatsFactory statsFactory) {
  61 + this.statsFactory = statsFactory;
  62 +
  63 + String key = StatsType.RATE_EXECUTOR.getName();
  64 +
  65 + this.totalAdded = statsFactory.createStatsCounter(key, TOTAL_ADDED);
  66 + this.totalLaunched = statsFactory.createStatsCounter(key, TOTAL_LAUNCHED);
  67 + this.totalReleased = statsFactory.createStatsCounter(key, TOTAL_RELEASED);
  68 + this.totalFailed = statsFactory.createStatsCounter(key, TOTAL_FAILED);
  69 + this.totalExpired = statsFactory.createStatsCounter(key, TOTAL_EXPIRED);
  70 + this.totalRejected = statsFactory.createStatsCounter(key, TOTAL_REJECTED);
  71 + this.totalRateLimited = statsFactory.createStatsCounter(key, TOTAL_RATE_LIMITED);
  72 +
  73 + this.statsCounters.add(totalAdded);
  74 + this.statsCounters.add(totalLaunched);
  75 + this.statsCounters.add(totalReleased);
  76 + this.statsCounters.add(totalFailed);
  77 + this.statsCounters.add(totalExpired);
  78 + this.statsCounters.add(totalRejected);
  79 + this.statsCounters.add(totalRateLimited);
  80 + }
  81 +
  82 + public void incrementRateLimitedTenant(TenantId tenantId){
  83 + rateLimitedTenants.computeIfAbsent(tenantId,
  84 + tId -> {
  85 + String key = StatsType.RATE_EXECUTOR.getName() + ".tenant";
  86 + return statsFactory.createDefaultCounter(key, TENANT_ID_TAG, tId.toString());
  87 + }
  88 + )
  89 + .increment();
  90 + }
  91 +}
... ...