Commit ef33c68727c5d0facba3d09fb7a5f5810a0d131c

Authored by vzikratyi-tb
Committed by GitHub
1 parent 9728478b

Added kafka consumer-groups statistics (#4123)

* Added kafka consumer-groups statistics

* Fixed PR notes

* Updated stats.kafka-response-timeout-ms

* Log kafka stats only once for CORE and RULE_ENGINE
... ... @@ -622,6 +622,10 @@ queue:
622 622 transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
623 623 notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1}"
624 624 js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100}"
  625 + consumer-stats:
  626 + enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
  627 + print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
  628 + kafka-response-timeout-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS:1000}"
625 629 aws_sqs:
626 630 use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
627 631 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.kafka;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Getter;
  20 +import lombok.NoArgsConstructor;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  23 +import org.springframework.stereotype.Component;
  24 +
  25 +@Component
  26 +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
  27 +@Getter
  28 +@AllArgsConstructor
  29 +@NoArgsConstructor
  30 +public class TbKafkaConsumerStatisticConfig {
  31 + @Value("${queue.kafka.consumer-stats.enabled:true}")
  32 + private Boolean enabled;
  33 + @Value("${queue.kafka.consumer-stats.print-interval-ms:60000}")
  34 + private Long printIntervalMs;
  35 + @Value("${queue.kafka.consumer-stats.kafka-response-timeout-ms:1000}")
  36 + private Long kafkaResponseTimeoutMs;
  37 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.kafka;
  17 +
  18 +import lombok.Builder;
  19 +import lombok.Data;
  20 +import lombok.RequiredArgsConstructor;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.apache.kafka.clients.admin.AdminClient;
  23 +import org.apache.kafka.clients.consumer.Consumer;
  24 +import org.apache.kafka.clients.consumer.ConsumerConfig;
  25 +import org.apache.kafka.clients.consumer.KafkaConsumer;
  26 +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  27 +import org.apache.kafka.common.TopicPartition;
  28 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  29 +import org.springframework.stereotype.Component;
  30 +import org.springframework.util.StringUtils;
  31 +import org.thingsboard.common.util.ThingsBoardThreadFactory;
  32 +import org.thingsboard.server.common.data.id.TenantId;
  33 +import org.thingsboard.server.common.msg.queue.ServiceType;
  34 +import org.thingsboard.server.queue.discovery.PartitionService;
  35 +
  36 +import javax.annotation.PostConstruct;
  37 +import javax.annotation.PreDestroy;
  38 +import java.time.Duration;
  39 +import java.util.ArrayList;
  40 +import java.util.List;
  41 +import java.util.Map;
  42 +import java.util.Properties;
  43 +import java.util.Set;
  44 +import java.util.concurrent.ConcurrentHashMap;
  45 +import java.util.concurrent.Executors;
  46 +import java.util.concurrent.ScheduledExecutorService;
  47 +import java.util.concurrent.TimeUnit;
  48 +
  49 +@Slf4j
  50 +@Component
  51 +@RequiredArgsConstructor
  52 +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
  53 +public class TbKafkaConsumerStatsService {
  54 + private final Set<String> monitoredGroups = ConcurrentHashMap.newKeySet();
  55 +
  56 + private final TbKafkaSettings kafkaSettings;
  57 + private final TbKafkaConsumerStatisticConfig statsConfig;
  58 + private final PartitionService partitionService;
  59 +
  60 + private AdminClient adminClient;
  61 + private Consumer<String, byte[]> consumer;
  62 + private ScheduledExecutorService statsPrintScheduler;
  63 +
  64 + @PostConstruct
  65 + public void init() {
  66 + if (!statsConfig.getEnabled()) {
  67 + return;
  68 + }
  69 + this.adminClient = AdminClient.create(kafkaSettings.toAdminProps());
  70 + this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
  71 +
  72 + Properties consumerProps = kafkaSettings.toConsumerProps();
  73 + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client");
  74 + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group");
  75 + this.consumer = new KafkaConsumer<>(consumerProps);
  76 +
  77 + startLogScheduling();
  78 + }
  79 +
  80 + private void startLogScheduling() {
  81 + Duration timeoutDuration = Duration.ofMillis(statsConfig.getKafkaResponseTimeoutMs());
  82 + statsPrintScheduler.scheduleWithFixedDelay(() -> {
  83 + if (!isStatsPrintRequired()) {
  84 + return;
  85 + }
  86 + for (String groupId : monitoredGroups) {
  87 + try {
  88 + Map<TopicPartition, OffsetAndMetadata> groupOffsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
  89 + .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS);
  90 + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration);
  91 +
  92 + List<GroupTopicStats> lagTopicsStats = getTopicsStatsWithLag(groupOffsets, endOffsets);
  93 + if (!lagTopicsStats.isEmpty()) {
  94 + StringBuilder builder = new StringBuilder();
  95 + for (int i = 0; i < lagTopicsStats.size(); i++) {
  96 + builder.append(lagTopicsStats.get(i).toString());
  97 + if (i != lagTopicsStats.size() - 1) {
  98 + builder.append(", ");
  99 + }
  100 + }
  101 + log.info("[{}] Topic partitions with lag: [{}].", groupId, builder.toString());
  102 + }
  103 + } catch (Exception e) {
  104 + log.warn("[{}] Failed to get consumer group stats. Reason - {}.", groupId, e.getMessage());
  105 + log.trace("Detailed error: ", e);
  106 + }
  107 + }
  108 +
  109 + }, statsConfig.getPrintIntervalMs(), statsConfig.getPrintIntervalMs(), TimeUnit.MILLISECONDS);
  110 + }
  111 +
  112 + private boolean isStatsPrintRequired() {
  113 + boolean isMyRuleEnginePartition = partitionService.resolve(ServiceType.TB_RULE_ENGINE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
  114 + boolean isMyCorePartition = partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
  115 + return log.isInfoEnabled() && (isMyRuleEnginePartition || isMyCorePartition);
  116 + }
  117 +
  118 + private List<GroupTopicStats> getTopicsStatsWithLag(Map<TopicPartition, OffsetAndMetadata> groupOffsets, Map<TopicPartition, Long> endOffsets) {
  119 + List<GroupTopicStats> consumerGroupStats = new ArrayList<>();
  120 + for (TopicPartition topicPartition : groupOffsets.keySet()) {
  121 + long endOffset = endOffsets.get(topicPartition);
  122 + long committedOffset = groupOffsets.get(topicPartition).offset();
  123 + long lag = endOffset - committedOffset;
  124 + if (lag != 0) {
  125 + GroupTopicStats groupTopicStats = GroupTopicStats.builder()
  126 + .topic(topicPartition.topic())
  127 + .partition(topicPartition.partition())
  128 + .committedOffset(committedOffset)
  129 + .endOffset(endOffset)
  130 + .lag(lag)
  131 + .build();
  132 + consumerGroupStats.add(groupTopicStats);
  133 + }
  134 + }
  135 + return consumerGroupStats;
  136 + }
  137 +
  138 + public void registerClientGroup(String groupId) {
  139 + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) {
  140 + monitoredGroups.add(groupId);
  141 + }
  142 + }
  143 +
  144 + public void unregisterClientGroup(String groupId) {
  145 + if (statsConfig.getEnabled() && !StringUtils.isEmpty(groupId)) {
  146 + monitoredGroups.remove(groupId);
  147 + }
  148 + }
  149 +
  150 + @PreDestroy
  151 + public void destroy() {
  152 + if (statsPrintScheduler != null) {
  153 + statsPrintScheduler.shutdownNow();
  154 + }
  155 + if (adminClient != null) {
  156 + adminClient.close();
  157 + }
  158 + if (consumer != null) {
  159 + consumer.close();
  160 + }
  161 + }
  162 +
  163 +
  164 + @Builder
  165 + @Data
  166 + private static class GroupTopicStats {
  167 + private String topic;
  168 + private int partition;
  169 + private long committedOffset;
  170 + private long endOffset;
  171 + private long lag;
  172 +
  173 + @Override
  174 + public String toString() {
  175 + return "[" +
  176 + "topic=[" + topic + ']' +
  177 + ", partition=[" + partition + "]" +
  178 + ", committedOffset=[" + committedOffset + "]" +
  179 + ", endOffset=[" + endOffset + "]" +
  180 + ", lag=[" + lag + "]" +
  181 + "]";
  182 + }
  183 + }
  184 +}
... ...
... ... @@ -42,10 +42,13 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
42 42 private final KafkaConsumer<String, byte[]> consumer;
43 43 private final TbKafkaDecoder<T> decoder;
44 44
  45 + private final TbKafkaConsumerStatsService statsService;
  46 + private final String groupId;
  47 +
45 48 @Builder
46 49 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
47 50 String clientId, String groupId, String topic,
48   - TbQueueAdmin admin) {
  51 + TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) {
49 52 super(topic);
50 53 Properties props = settings.toConsumerProps();
51 54 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
... ... @@ -53,6 +56,13 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
53 56 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
54 57 }
55 58
  59 + this.statsService = statsService;
  60 + this.groupId = groupId;
  61 +
  62 + if (statsService != null) {
  63 + statsService.registerClientGroup(groupId);
  64 + }
  65 +
56 66 this.admin = admin;
57 67 this.consumer = new KafkaConsumer<>(props);
58 68 this.decoder = decoder;
... ... @@ -96,6 +106,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
96 106 consumer.unsubscribe();
97 107 consumer.close();
98 108 }
  109 + if (statsService != null) {
  110 + statsService.unregisterClientGroup(groupId);
  111 + }
99 112 }
100   -
101 113 }
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
39 39 import org.thingsboard.server.queue.discovery.PartitionService;
40 40 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
41 41 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  42 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
43 44 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
44 45 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -65,6 +66,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
65 66 private final TbQueueTransportApiSettings transportApiSettings;
66 67 private final TbQueueTransportNotificationSettings transportNotificationSettings;
67 68 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  69 + private final TbKafkaConsumerStatsService consumerStatsService;
68 70
69 71 private final TbQueueAdmin coreAdmin;
70 72 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -79,6 +81,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
79 81 TbQueueTransportApiSettings transportApiSettings,
80 82 TbQueueTransportNotificationSettings transportNotificationSettings,
81 83 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  84 + TbKafkaConsumerStatsService consumerStatsService,
82 85 TbKafkaTopicConfigs kafkaTopicConfigs) {
83 86 this.partitionService = partitionService;
84 87 this.kafkaSettings = kafkaSettings;
... ... @@ -88,6 +91,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
88 91 this.transportApiSettings = transportApiSettings;
89 92 this.transportNotificationSettings = transportNotificationSettings;
90 93 this.jsInvokeSettings = jsInvokeSettings;
  94 + this.consumerStatsService = consumerStatsService;
91 95
92 96 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
93 97 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -156,6 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
156 160 consumerBuilder.groupId("re-" + queueName + "-consumer");
157 161 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
158 162 consumerBuilder.admin(ruleEngineAdmin);
  163 + consumerBuilder.statsService(consumerStatsService);
159 164 return consumerBuilder.build();
160 165 }
161 166
... ... @@ -168,6 +173,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
168 173 consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
169 174 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
170 175 consumerBuilder.admin(notificationAdmin);
  176 + consumerBuilder.statsService(consumerStatsService);
171 177 return consumerBuilder.build();
172 178 }
173 179
... ... @@ -180,6 +186,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
180 186 consumerBuilder.groupId("monolith-core-consumer");
181 187 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
182 188 consumerBuilder.admin(coreAdmin);
  189 + consumerBuilder.statsService(consumerStatsService);
183 190 return consumerBuilder.build();
184 191 }
185 192
... ... @@ -192,6 +199,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
192 199 consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
193 200 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
194 201 consumerBuilder.admin(notificationAdmin);
  202 + consumerBuilder.statsService(consumerStatsService);
195 203 return consumerBuilder.build();
196 204 }
197 205
... ... @@ -204,6 +212,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
204 212 consumerBuilder.groupId("monolith-transport-api-consumer");
205 213 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
206 214 consumerBuilder.admin(transportApiAdmin);
  215 + consumerBuilder.statsService(consumerStatsService);
207 216 return consumerBuilder.build();
208 217 }
209 218
... ... @@ -237,6 +246,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
237 246 return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
238 247 }
239 248 );
  249 + responseBuilder.statsService(consumerStatsService);
240 250 responseBuilder.admin(jsExecutorAdmin);
241 251
242 252 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
... ... @@ -259,6 +269,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
259 269 consumerBuilder.groupId("monolith-us-consumer");
260 270 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
261 271 consumerBuilder.admin(coreAdmin);
  272 + consumerBuilder.statsService(consumerStatsService);
262 273 return consumerBuilder.build();
263 274 }
264 275
... ...
... ... @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
39 39 import org.thingsboard.server.queue.discovery.PartitionService;
40 40 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
41 41 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  42 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
43 44 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
44 45 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -62,6 +63,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
62 63 private final TbQueueRuleEngineSettings ruleEngineSettings;
63 64 private final TbQueueTransportApiSettings transportApiSettings;
64 65 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  66 + private final TbKafkaConsumerStatsService consumerStatsService;
65 67
66 68 private final TbQueueAdmin coreAdmin;
67 69 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -75,6 +77,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
75 77 TbQueueRuleEngineSettings ruleEngineSettings,
76 78 TbQueueTransportApiSettings transportApiSettings,
77 79 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  80 + TbKafkaConsumerStatsService consumerStatsService,
78 81 TbKafkaTopicConfigs kafkaTopicConfigs) {
79 82 this.partitionService = partitionService;
80 83 this.kafkaSettings = kafkaSettings;
... ... @@ -83,6 +86,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
83 86 this.ruleEngineSettings = ruleEngineSettings;
84 87 this.transportApiSettings = transportApiSettings;
85 88 this.jsInvokeSettings = jsInvokeSettings;
  89 + this.consumerStatsService = consumerStatsService;
86 90
87 91 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
88 92 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -150,6 +154,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
150 154 consumerBuilder.groupId("tb-core-node");
151 155 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
152 156 consumerBuilder.admin(coreAdmin);
  157 + consumerBuilder.statsService(consumerStatsService);
153 158 return consumerBuilder.build();
154 159 }
155 160
... ... @@ -162,6 +167,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
162 167 consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
163 168 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
164 169 consumerBuilder.admin(notificationAdmin);
  170 + consumerBuilder.statsService(consumerStatsService);
165 171 return consumerBuilder.build();
166 172 }
167 173
... ... @@ -174,6 +180,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
174 180 consumerBuilder.groupId("tb-core-transport-api-consumer");
175 181 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
176 182 consumerBuilder.admin(transportApiAdmin);
  183 + consumerBuilder.statsService(consumerStatsService);
177 184 return consumerBuilder.build();
178 185 }
179 186
... ... @@ -208,6 +215,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
208 215 }
209 216 );
210 217 responseBuilder.admin(jsExecutorAdmin);
  218 + responseBuilder.statsService(consumerStatsService);
211 219
212 220 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
213 221 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
... ... @@ -229,6 +237,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
229 237 consumerBuilder.groupId("tb-core-us-consumer");
230 238 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
231 239 consumerBuilder.admin(coreAdmin);
  240 + consumerBuilder.statsService(consumerStatsService);
232 241 return consumerBuilder.build();
233 242 }
234 243
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
37 37 import org.thingsboard.server.queue.discovery.PartitionService;
38 38 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
39 39 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  40 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
40 41 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
41 42 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -59,6 +60,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
59 60 private final TbQueueCoreSettings coreSettings;
60 61 private final TbQueueRuleEngineSettings ruleEngineSettings;
61 62 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
  63 + private final TbKafkaConsumerStatsService consumerStatsService;
62 64
63 65 private final TbQueueAdmin coreAdmin;
64 66 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -70,6 +72,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
70 72 TbQueueCoreSettings coreSettings,
71 73 TbQueueRuleEngineSettings ruleEngineSettings,
72 74 TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  75 + TbKafkaConsumerStatsService consumerStatsService,
73 76 TbKafkaTopicConfigs kafkaTopicConfigs) {
74 77 this.partitionService = partitionService;
75 78 this.kafkaSettings = kafkaSettings;
... ... @@ -77,6 +80,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
77 80 this.coreSettings = coreSettings;
78 81 this.ruleEngineSettings = ruleEngineSettings;
79 82 this.jsInvokeSettings = jsInvokeSettings;
  83 + this.consumerStatsService = consumerStatsService;
80 84
81 85 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
82 86 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -145,6 +149,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
145 149 consumerBuilder.groupId("re-" + queueName + "-consumer");
146 150 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
147 151 consumerBuilder.admin(ruleEngineAdmin);
  152 + consumerBuilder.statsService(consumerStatsService);
148 153 return consumerBuilder.build();
149 154 }
150 155
... ... @@ -157,6 +162,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
157 162 consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
158 163 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
159 164 consumerBuilder.admin(notificationAdmin);
  165 + consumerBuilder.statsService(consumerStatsService);
160 166 return consumerBuilder.build();
161 167 }
162 168
... ... @@ -181,6 +187,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
181 187 }
182 188 );
183 189 responseBuilder.admin(jsExecutorAdmin);
  190 + responseBuilder.statsService(consumerStatsService);
184 191
185 192 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
186 193 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
... ...
... ... @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
32 32 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
33 33 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
34 34 import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
  35 +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
35 36 import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
36 37 import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
37 38 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
... ... @@ -54,6 +55,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
54 55 private final TbQueueRuleEngineSettings ruleEngineSettings;
55 56 private final TbQueueTransportApiSettings transportApiSettings;
56 57 private final TbQueueTransportNotificationSettings transportNotificationSettings;
  58 + private final TbKafkaConsumerStatsService consumerStatsService;
57 59
58 60 private final TbQueueAdmin coreAdmin;
59 61 private final TbQueueAdmin ruleEngineAdmin;
... ... @@ -66,6 +68,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
66 68 TbQueueRuleEngineSettings ruleEngineSettings,
67 69 TbQueueTransportApiSettings transportApiSettings,
68 70 TbQueueTransportNotificationSettings transportNotificationSettings,
  71 + TbKafkaConsumerStatsService consumerStatsService,
69 72 TbKafkaTopicConfigs kafkaTopicConfigs) {
70 73 this.kafkaSettings = kafkaSettings;
71 74 this.serviceInfoProvider = serviceInfoProvider;
... ... @@ -73,6 +76,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
73 76 this.ruleEngineSettings = ruleEngineSettings;
74 77 this.transportApiSettings = transportApiSettings;
75 78 this.transportNotificationSettings = transportNotificationSettings;
  79 + this.consumerStatsService = consumerStatsService;
76 80
77 81 this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
78 82 this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
... ... @@ -95,6 +99,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
95 99 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
96 100 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
97 101 responseBuilder.admin(transportApiAdmin);
  102 + responseBuilder.statsService(consumerStatsService);
98 103
99 104 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
100 105 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
... ... @@ -136,6 +141,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
136 141 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
137 142 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
138 143 responseBuilder.admin(notificationAdmin);
  144 + responseBuilder.statsService(consumerStatsService);
139 145 return responseBuilder.build();
140 146 }
141 147
... ...