Commit b957a0ce8b5e7a61a48644016116530964d49d03

Authored by vzikratyi
1 parent 6b53e84c

Log time to acknowledge message

... ... @@ -22,17 +22,11 @@ import org.springframework.scheduling.annotation.Scheduled;
22 22 import org.springframework.stereotype.Service;
23 23 import org.thingsboard.rule.engine.api.RpcError;
24 24 import org.thingsboard.server.actors.ActorSystemContext;
25   -import org.thingsboard.server.common.data.id.RuleNodeId;
26 25 import org.thingsboard.server.common.data.id.TenantId;
27 26 import org.thingsboard.server.common.msg.TbActorMsg;
28 27 import org.thingsboard.server.common.msg.TbMsg;
29   -import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
30   -import org.thingsboard.server.common.msg.queue.RuleEngineException;
31   -import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
32   -import org.thingsboard.server.common.msg.queue.ServiceQueue;
33   -import org.thingsboard.server.common.msg.queue.ServiceType;
34   -import org.thingsboard.server.common.msg.queue.TbCallback;
35   -import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  28 +import org.thingsboard.server.common.msg.queue.*;
  29 +import org.thingsboard.server.common.stats.StatsFactory;
36 30 import org.thingsboard.server.gen.transport.TransportProtos;
37 31 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
38 32 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
... ... @@ -42,40 +36,25 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
42 36 import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
43 37 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
44 38 import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
45   -import org.thingsboard.server.common.stats.StatsFactory;
46 39 import org.thingsboard.server.queue.util.TbRuleEngineComponent;
47 40 import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
48   -import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
49   -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
50   -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
51   -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
52   -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
53   -import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
54   -import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
  41 +import org.thingsboard.server.service.queue.processing.*;
55 42 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
56 43 import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
57 44 import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
58 45
59 46 import javax.annotation.PostConstruct;
60 47 import javax.annotation.PreDestroy;
61   -import java.util.Collections;
62   -import java.util.HashSet;
63   -import java.util.List;
64   -import java.util.Map;
65   -import java.util.Optional;
66   -import java.util.Set;
67   -import java.util.UUID;
68   -import java.util.concurrent.ConcurrentHashMap;
69   -import java.util.concurrent.ConcurrentMap;
70   -import java.util.concurrent.ExecutorService;
71   -import java.util.concurrent.Executors;
72   -import java.util.concurrent.TimeUnit;
  48 +import java.util.*;
  49 +import java.util.concurrent.*;
73 50
74 51 @Service
75 52 @TbRuleEngineComponent
76 53 @Slf4j
77 54 public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
78 55
  56 + public static final String SUCCESSFUL_STATUS = "successful";
  57 + public static final String FAILED_STATUS = "failed";
79 58 @Value("${queue.rule-engine.poll-interval}")
80 59 private long pollDuration;
81 60 @Value("${queue.rule-engine.pack-processing-timeout}")
... ... @@ -170,7 +149,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
170 149 log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
171 150 ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
172 151 TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
173   - TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx);
  152 + TbMsgCallback callback = statsEnabled ?
  153 + new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
  154 + new TbMsgPackCallback(id, tenantId, ctx);
174 155 try {
175 156 if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
176 157 forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.service.queue;
17 17
  18 +import io.micrometer.core.instrument.Timer;
18 19 import lombok.extern.slf4j.Slf4j;
19 20 import org.thingsboard.server.common.data.id.RuleNodeId;
20 21 import org.thingsboard.server.common.data.id.TenantId;
... ... @@ -23,28 +24,45 @@ import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
23 24 import org.thingsboard.server.common.msg.queue.TbMsgCallback;
24 25
25 26 import java.util.UUID;
  27 +import java.util.concurrent.TimeUnit;
26 28
27 29 @Slf4j
28 30 public class TbMsgPackCallback implements TbMsgCallback {
29 31 private final UUID id;
30 32 private final TenantId tenantId;
31 33 private final TbMsgPackProcessingContext ctx;
  34 + private final long startMsgProcessing;
  35 + private final Timer successfulMsgTimer;
  36 + private final Timer failedMsgTimer;
32 37
33 38 public TbMsgPackCallback(UUID id, TenantId tenantId, TbMsgPackProcessingContext ctx) {
  39 + this(id, tenantId, ctx, null, null);
  40 + }
  41 +
  42 + public TbMsgPackCallback(UUID id, TenantId tenantId, TbMsgPackProcessingContext ctx, Timer successfulMsgTimer, Timer failedMsgTimer) {
34 43 this.id = id;
35 44 this.tenantId = tenantId;
36 45 this.ctx = ctx;
  46 + this.successfulMsgTimer = successfulMsgTimer;
  47 + this.failedMsgTimer = failedMsgTimer;
  48 + startMsgProcessing = System.currentTimeMillis();
37 49 }
38 50
39 51 @Override
40 52 public void onSuccess() {
41 53 log.trace("[{}] ON SUCCESS", id);
  54 + if (successfulMsgTimer != null) {
  55 + successfulMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
  56 + }
42 57 ctx.onSuccess(id);
43 58 }
44 59
45 60 @Override
46 61 public void onFailure(RuleEngineException e) {
47 62 log.trace("[{}] ON FAILURE", id, e);
  63 + if (failedMsgTimer != null) {
  64 + failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS);
  65 + }
48 66 ctx.onFailure(tenantId, id, e);
49 67 }
50 68
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.service.queue;
17 17
  18 +import io.micrometer.core.instrument.DistributionSummary;
  19 +import io.micrometer.core.instrument.Timer;
18 20 import lombok.extern.slf4j.Slf4j;
19 21 import org.thingsboard.server.common.data.id.TenantId;
20 22 import org.thingsboard.server.common.msg.queue.RuleEngineException;
... ... @@ -41,6 +43,8 @@ public class TbRuleEngineConsumerStats {
41 43 public static final String SUCCESSFUL_ITERATIONS = "successfulIterations";
42 44 public static final String FAILED_ITERATIONS = "failedIterations";
43 45
  46 + private final StatsFactory statsFactory;
  47 +
44 48 private final StatsCounter totalMsgCounter;
45 49 private final StatsCounter successMsgCounter;
46 50 private final StatsCounter tmpTimeoutMsgCounter;
... ... @@ -54,12 +58,14 @@ public class TbRuleEngineConsumerStats {
54 58
55 59 private final List<StatsCounter> counters = new ArrayList<>();
56 60 private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>();
  61 + private final ConcurrentMap<TenantId, Timer> tenantMsgProcessTimers = new ConcurrentHashMap<>();
57 62 private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>();
58 63
59 64 private final String queueName;
60 65
61 66 public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) {
62 67 this.queueName = queueName;
  68 + this.statsFactory = statsFactory;
63 69
64 70 String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
65 71 this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
... ... @@ -82,6 +88,14 @@ public class TbRuleEngineConsumerStats {
82 88 counters.add(failedIterationsCounter);
83 89 }
84 90
  91 + public Timer getTimer(TenantId tenantId, String status){
  92 + return tenantMsgProcessTimers.computeIfAbsent(tenantId,
  93 + id -> statsFactory.createTimer(StatsType.RULE_ENGINE.getName() + "." + queueName,
  94 + "tenantId", tenantId.getId().toString(),
  95 + "status", status
  96 + ));
  97 + }
  98 +
85 99 public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) {
86 100 int success = msg.getSuccessMap().size();
87 101 int pending = msg.getPendingMap().size();
... ...
... ... @@ -785,6 +785,10 @@ service:
785 785 metrics:
786 786 # Enable/disable actuator metrics.
787 787 enabled: "${METRICS_ENABLED:false}"
  788 + timer:
  789 + # Metrics percentiles returned by actuator for timer metrics. List of double values (divided by ,).
  790 + percentiles: "${METRICS_TIMER_PERCENTILES:0.5}"
  791 +
788 792
789 793 management:
790 794 endpoints:
... ...
... ... @@ -15,13 +15,13 @@
15 15 */
16 16 package org.thingsboard.server.common.stats;
17 17
18   -import io.micrometer.core.instrument.Counter;
19   -import io.micrometer.core.instrument.MeterRegistry;
20   -import io.micrometer.core.instrument.Tags;
  18 +import io.micrometer.core.instrument.*;
21 19 import org.springframework.beans.factory.annotation.Autowired;
22 20 import org.springframework.beans.factory.annotation.Value;
23 21 import org.springframework.stereotype.Service;
  22 +import org.springframework.util.StringUtils;
24 23
  24 +import javax.annotation.PostConstruct;
25 25 import java.util.concurrent.atomic.AtomicInteger;
26 26
27 27 @Service
... ... @@ -40,6 +40,23 @@ public class DefaultStatsFactory implements StatsFactory {
40 40 @Value("${metrics.enabled:false}")
41 41 private Boolean metricsEnabled;
42 42
  43 + @Value("${metrics.timer.percentiles:0.5}")
  44 + private String timerPercentilesStr;
  45 +
  46 + private double[] timerPercentiles;
  47 +
  48 + @PostConstruct
  49 + public void init() {
  50 + if (!StringUtils.isEmpty(timerPercentilesStr)) {
  51 + String[] split = timerPercentilesStr.split(",");
  52 + timerPercentiles = new double[split.length];
  53 + for (int i = 0; i < split.length; i++) {
  54 + timerPercentiles[i] = Double.parseDouble(split[i]);
  55 + }
  56 + }
  57 + }
  58 +
  59 +
43 60 @Override
44 61 public StatsCounter createStatsCounter(String key, String statsName) {
45 62 return new StatsCounter(
... ... @@ -74,9 +91,21 @@ public class DefaultStatsFactory implements StatsFactory {
74 91 return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter);
75 92 }
76 93
  94 + @Override
  95 + public Timer createTimer(String key, String... tags) {
  96 + Timer.Builder timerBuilder = Timer.builder(key)
  97 + .tags(tags)
  98 + .publishPercentiles();
  99 + if (timerPercentiles != null && timerPercentiles.length > 0) {
  100 + timerBuilder.publishPercentiles(timerPercentiles);
  101 + }
  102 + return timerBuilder.register(meterRegistry);
  103 + }
  104 +
77 105 private static class StubCounter implements Counter {
78 106 @Override
79   - public void increment(double amount) {}
  107 + public void increment(double amount) {
  108 + }
80 109
81 110 @Override
82 111 public double count() {
... ...
... ... @@ -15,6 +15,8 @@
15 15 */
16 16 package org.thingsboard.server.common.stats;
17 17
  18 +import io.micrometer.core.instrument.Timer;
  19 +
18 20 public interface StatsFactory {
19 21 StatsCounter createStatsCounter(String key, String statsName);
20 22
... ... @@ -23,4 +25,6 @@ public interface StatsFactory {
23 25 <T extends Number> T createGauge(String key, T number, String... tags);
24 26
25 27 MessagesStats createMessagesStats(String key);
  28 +
  29 + Timer createTimer(String key, String... tags);
26 30 }
... ...