Commit 338b7492c9a58a5d794fe92843bfc74f261c64da
Committed by
GitHub
Merge pull request #3283 from vzikratyi-tb/feature/ack-time-metrics
Track time to acknowledge messages (in actuator/prometheus)
Showing
6 changed files
with
83 additions
and
33 deletions
... | ... | @@ -22,17 +22,11 @@ import org.springframework.scheduling.annotation.Scheduled; |
22 | 22 | import org.springframework.stereotype.Service; |
23 | 23 | import org.thingsboard.rule.engine.api.RpcError; |
24 | 24 | import org.thingsboard.server.actors.ActorSystemContext; |
25 | -import org.thingsboard.server.common.data.id.RuleNodeId; | |
26 | 25 | import org.thingsboard.server.common.data.id.TenantId; |
27 | 26 | import org.thingsboard.server.common.msg.TbActorMsg; |
28 | 27 | import org.thingsboard.server.common.msg.TbMsg; |
29 | -import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; | |
30 | -import org.thingsboard.server.common.msg.queue.RuleEngineException; | |
31 | -import org.thingsboard.server.common.msg.queue.RuleNodeInfo; | |
32 | -import org.thingsboard.server.common.msg.queue.ServiceQueue; | |
33 | -import org.thingsboard.server.common.msg.queue.ServiceType; | |
34 | -import org.thingsboard.server.common.msg.queue.TbCallback; | |
35 | -import org.thingsboard.server.common.msg.queue.TbMsgCallback; | |
28 | +import org.thingsboard.server.common.msg.queue.*; | |
29 | +import org.thingsboard.server.common.stats.StatsFactory; | |
36 | 30 | import org.thingsboard.server.gen.transport.TransportProtos; |
37 | 31 | import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
38 | 32 | import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; |
... | ... | @@ -42,40 +36,25 @@ import org.thingsboard.server.queue.discovery.PartitionChangeEvent; |
42 | 36 | import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; |
43 | 37 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
44 | 38 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
45 | -import org.thingsboard.server.common.stats.StatsFactory; | |
46 | 39 | import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
47 | 40 | import org.thingsboard.server.service.encoding.DataDecodingEncodingService; |
48 | -import org.thingsboard.server.service.queue.processing.AbstractConsumerService; | |
49 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision; | |
50 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; | |
51 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy; | |
52 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; | |
53 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; | |
54 | -import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; | |
41 | +import org.thingsboard.server.service.queue.processing.*; | |
55 | 42 | import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
56 | 43 | import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; |
57 | 44 | import org.thingsboard.server.service.stats.RuleEngineStatisticsService; |
58 | 45 | |
59 | 46 | import javax.annotation.PostConstruct; |
60 | 47 | import javax.annotation.PreDestroy; |
61 | -import java.util.Collections; | |
62 | -import java.util.HashSet; | |
63 | -import java.util.List; | |
64 | -import java.util.Map; | |
65 | -import java.util.Optional; | |
66 | -import java.util.Set; | |
67 | -import java.util.UUID; | |
68 | -import java.util.concurrent.ConcurrentHashMap; | |
69 | -import java.util.concurrent.ConcurrentMap; | |
70 | -import java.util.concurrent.ExecutorService; | |
71 | -import java.util.concurrent.Executors; | |
72 | -import java.util.concurrent.TimeUnit; | |
48 | +import java.util.*; | |
49 | +import java.util.concurrent.*; | |
73 | 50 | |
74 | 51 | @Service |
75 | 52 | @TbRuleEngineComponent |
76 | 53 | @Slf4j |
77 | 54 | public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService { |
78 | 55 | |
56 | + public static final String SUCCESSFUL_STATUS = "successful"; | |
57 | + public static final String FAILED_STATUS = "failed"; | |
79 | 58 | @Value("${queue.rule-engine.poll-interval}") |
80 | 59 | private long pollDuration; |
81 | 60 | @Value("${queue.rule-engine.pack-processing-timeout}") |
... | ... | @@ -170,7 +149,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< |
170 | 149 | log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); |
171 | 150 | ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); |
172 | 151 | TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); |
173 | - TbMsgCallback callback = new TbMsgPackCallback(id, tenantId, ctx); | |
152 | + TbMsgCallback callback = statsEnabled ? | |
153 | + new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : | |
154 | + new TbMsgPackCallback(id, tenantId, ctx); | |
174 | 155 | try { |
175 | 156 | if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { |
176 | 157 | forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); | ... | ... |
... | ... | @@ -15,6 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.queue; |
17 | 17 | |
18 | +import io.micrometer.core.instrument.Timer; | |
18 | 19 | import lombok.extern.slf4j.Slf4j; |
19 | 20 | import org.thingsboard.server.common.data.id.RuleNodeId; |
20 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
... | ... | @@ -23,28 +24,45 @@ import org.thingsboard.server.common.msg.queue.RuleNodeInfo; |
23 | 24 | import org.thingsboard.server.common.msg.queue.TbMsgCallback; |
24 | 25 | |
25 | 26 | import java.util.UUID; |
27 | +import java.util.concurrent.TimeUnit; | |
26 | 28 | |
27 | 29 | @Slf4j |
28 | 30 | public class TbMsgPackCallback implements TbMsgCallback { |
29 | 31 | private final UUID id; |
30 | 32 | private final TenantId tenantId; |
31 | 33 | private final TbMsgPackProcessingContext ctx; |
34 | + private final long startMsgProcessing; | |
35 | + private final Timer successfulMsgTimer; | |
36 | + private final Timer failedMsgTimer; | |
32 | 37 | |
33 | 38 | public TbMsgPackCallback(UUID id, TenantId tenantId, TbMsgPackProcessingContext ctx) { |
39 | + this(id, tenantId, ctx, null, null); | |
40 | + } | |
41 | + | |
42 | + public TbMsgPackCallback(UUID id, TenantId tenantId, TbMsgPackProcessingContext ctx, Timer successfulMsgTimer, Timer failedMsgTimer) { | |
34 | 43 | this.id = id; |
35 | 44 | this.tenantId = tenantId; |
36 | 45 | this.ctx = ctx; |
46 | + this.successfulMsgTimer = successfulMsgTimer; | |
47 | + this.failedMsgTimer = failedMsgTimer; | |
48 | + startMsgProcessing = System.currentTimeMillis(); | |
37 | 49 | } |
38 | 50 | |
39 | 51 | @Override |
40 | 52 | public void onSuccess() { |
41 | 53 | log.trace("[{}] ON SUCCESS", id); |
54 | + if (successfulMsgTimer != null) { | |
55 | + successfulMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); | |
56 | + } | |
42 | 57 | ctx.onSuccess(id); |
43 | 58 | } |
44 | 59 | |
45 | 60 | @Override |
46 | 61 | public void onFailure(RuleEngineException e) { |
47 | 62 | log.trace("[{}] ON FAILURE", id, e); |
63 | + if (failedMsgTimer != null) { | |
64 | + failedMsgTimer.record(System.currentTimeMillis() - startMsgProcessing, TimeUnit.MILLISECONDS); | |
65 | + } | |
48 | 66 | ctx.onFailure(tenantId, id, e); |
49 | 67 | } |
50 | 68 | ... | ... |
... | ... | @@ -15,6 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.queue; |
17 | 17 | |
18 | +import io.micrometer.core.instrument.DistributionSummary; | |
19 | +import io.micrometer.core.instrument.Timer; | |
18 | 20 | import lombok.extern.slf4j.Slf4j; |
19 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
20 | 22 | import org.thingsboard.server.common.msg.queue.RuleEngineException; |
... | ... | @@ -41,6 +43,8 @@ public class TbRuleEngineConsumerStats { |
41 | 43 | public static final String SUCCESSFUL_ITERATIONS = "successfulIterations"; |
42 | 44 | public static final String FAILED_ITERATIONS = "failedIterations"; |
43 | 45 | |
46 | + private final StatsFactory statsFactory; | |
47 | + | |
44 | 48 | private final StatsCounter totalMsgCounter; |
45 | 49 | private final StatsCounter successMsgCounter; |
46 | 50 | private final StatsCounter tmpTimeoutMsgCounter; |
... | ... | @@ -54,12 +58,14 @@ public class TbRuleEngineConsumerStats { |
54 | 58 | |
55 | 59 | private final List<StatsCounter> counters = new ArrayList<>(); |
56 | 60 | private final ConcurrentMap<UUID, TbTenantRuleEngineStats> tenantStats = new ConcurrentHashMap<>(); |
61 | + private final ConcurrentMap<TenantId, Timer> tenantMsgProcessTimers = new ConcurrentHashMap<>(); | |
57 | 62 | private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>(); |
58 | 63 | |
59 | 64 | private final String queueName; |
60 | 65 | |
61 | 66 | public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) { |
62 | 67 | this.queueName = queueName; |
68 | + this.statsFactory = statsFactory; | |
63 | 69 | |
64 | 70 | String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName; |
65 | 71 | this.totalMsgCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS); |
... | ... | @@ -82,6 +88,14 @@ public class TbRuleEngineConsumerStats { |
82 | 88 | counters.add(failedIterationsCounter); |
83 | 89 | } |
84 | 90 | |
91 | + public Timer getTimer(TenantId tenantId, String status){ | |
92 | + return tenantMsgProcessTimers.computeIfAbsent(tenantId, | |
93 | + id -> statsFactory.createTimer(StatsType.RULE_ENGINE.getName() + "." + queueName, | |
94 | + "tenantId", tenantId.getId().toString(), | |
95 | + "status", status | |
96 | + )); | |
97 | + } | |
98 | + | |
85 | 99 | public void log(TbRuleEngineProcessingResult msg, boolean finalIterationForPack) { |
86 | 100 | int success = msg.getSuccessMap().size(); |
87 | 101 | int pending = msg.getPendingMap().size(); | ... | ... |
... | ... | @@ -783,6 +783,10 @@ service: |
783 | 783 | metrics: |
784 | 784 | # Enable/disable actuator metrics. |
785 | 785 | enabled: "${METRICS_ENABLED:false}" |
786 | + timer: | |
787 | + # Metrics percentiles returned by actuator for timer metrics. List of double values (divided by ,). | |
788 | + percentiles: "${METRICS_TIMER_PERCENTILES:0.5}" | |
789 | + | |
786 | 790 | |
787 | 791 | management: |
788 | 792 | endpoints: | ... | ... |
... | ... | @@ -15,13 +15,13 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.stats; |
17 | 17 | |
18 | -import io.micrometer.core.instrument.Counter; | |
19 | -import io.micrometer.core.instrument.MeterRegistry; | |
20 | -import io.micrometer.core.instrument.Tags; | |
18 | +import io.micrometer.core.instrument.*; | |
21 | 19 | import org.springframework.beans.factory.annotation.Autowired; |
22 | 20 | import org.springframework.beans.factory.annotation.Value; |
23 | 21 | import org.springframework.stereotype.Service; |
22 | +import org.springframework.util.StringUtils; | |
24 | 23 | |
24 | +import javax.annotation.PostConstruct; | |
25 | 25 | import java.util.concurrent.atomic.AtomicInteger; |
26 | 26 | |
27 | 27 | @Service |
... | ... | @@ -40,6 +40,23 @@ public class DefaultStatsFactory implements StatsFactory { |
40 | 40 | @Value("${metrics.enabled:false}") |
41 | 41 | private Boolean metricsEnabled; |
42 | 42 | |
43 | + @Value("${metrics.timer.percentiles:0.5}") | |
44 | + private String timerPercentilesStr; | |
45 | + | |
46 | + private double[] timerPercentiles; | |
47 | + | |
48 | + @PostConstruct | |
49 | + public void init() { | |
50 | + if (!StringUtils.isEmpty(timerPercentilesStr)) { | |
51 | + String[] split = timerPercentilesStr.split(","); | |
52 | + timerPercentiles = new double[split.length]; | |
53 | + for (int i = 0; i < split.length; i++) { | |
54 | + timerPercentiles[i] = Double.parseDouble(split[i]); | |
55 | + } | |
56 | + } | |
57 | + } | |
58 | + | |
59 | + | |
43 | 60 | @Override |
44 | 61 | public StatsCounter createStatsCounter(String key, String statsName) { |
45 | 62 | return new StatsCounter( |
... | ... | @@ -74,9 +91,21 @@ public class DefaultStatsFactory implements StatsFactory { |
74 | 91 | return new DefaultMessagesStats(totalCounter, successfulCounter, failedCounter); |
75 | 92 | } |
76 | 93 | |
94 | + @Override | |
95 | + public Timer createTimer(String key, String... tags) { | |
96 | + Timer.Builder timerBuilder = Timer.builder(key) | |
97 | + .tags(tags) | |
98 | + .publishPercentiles(); | |
99 | + if (timerPercentiles != null && timerPercentiles.length > 0) { | |
100 | + timerBuilder.publishPercentiles(timerPercentiles); | |
101 | + } | |
102 | + return timerBuilder.register(meterRegistry); | |
103 | + } | |
104 | + | |
77 | 105 | private static class StubCounter implements Counter { |
78 | 106 | @Override |
79 | - public void increment(double amount) {} | |
107 | + public void increment(double amount) { | |
108 | + } | |
80 | 109 | |
81 | 110 | @Override |
82 | 111 | public double count() { | ... | ... |
... | ... | @@ -15,6 +15,8 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.stats; |
17 | 17 | |
18 | +import io.micrometer.core.instrument.Timer; | |
19 | + | |
18 | 20 | public interface StatsFactory { |
19 | 21 | StatsCounter createStatsCounter(String key, String statsName); |
20 | 22 | |
... | ... | @@ -23,4 +25,6 @@ public interface StatsFactory { |
23 | 25 | <T extends Number> T createGauge(String key, T number, String... tags); |
24 | 26 | |
25 | 27 | MessagesStats createMessagesStats(String key); |
28 | + | |
29 | + Timer createTimer(String key, String... tags); | |
26 | 30 | } | ... | ... |