Commit 6165b9083949a646642e1d5148caa5e3e0c3164a

Authored by Andrew Shvayka
1 parent 4255bb56

Rate limitter for Kafka consumer

... ... @@ -17,6 +17,11 @@ package org.thingsboard.server.service.transport;
17 17
18 18 import akka.actor.ActorRef;
19 19 import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import io.github.bucket4j.Bandwidth;
  21 +import io.github.bucket4j.BlockingBucket;
  22 +import io.github.bucket4j.Bucket4j;
  23 +import io.github.bucket4j.local.LocalBucket;
  24 +import io.github.bucket4j.local.LocalBucketBuilder;
20 25 import lombok.extern.slf4j.Slf4j;
21 26 import org.apache.kafka.clients.consumer.ConsumerRecords;
22 27 import org.apache.kafka.clients.producer.Callback;
... ... @@ -49,6 +54,7 @@ import java.util.Optional;
49 54 import java.util.UUID;
50 55 import java.util.concurrent.ExecutorService;
51 56 import java.util.concurrent.Executors;
  57 +import java.util.concurrent.TimeUnit;
52 58 import java.util.function.Consumer;
53 59
54 60 /**
... ... @@ -68,6 +74,13 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
68 74 @Value("${transport.remote.rule_engine.auto_commit_interval}")
69 75 private int autoCommitInterval;
70 76
  77 + @Value("${transport.remote.rule_engine.poll_records_pack_size}")
  78 + private long pollRecordsPackSize;
  79 + @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
  80 + private long pollRecordsPerSecond;
  81 + @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
  82 + private long pollRecordsPerMinute;
  83 +
71 84 @Autowired
72 85 private TbKafkaSettings kafkaSettings;
73 86
... ... @@ -109,15 +122,29 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
109 122 ruleEngineConsumerBuilder.groupId("tb-node");
110 123 ruleEngineConsumerBuilder.autoCommit(true);
111 124 ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
  125 + ruleEngineConsumerBuilder.maxPollRecords(pollRecordsPackSize);
112 126 ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
113 127
114 128 ruleEngineConsumer = ruleEngineConsumerBuilder.build();
115 129 ruleEngineConsumer.subscribe();
116 130
  131 + LocalBucketBuilder builder = Bucket4j.builder();
  132 + builder.addLimit(Bandwidth.simple(pollRecordsPerSecond, Duration.ofSeconds(1)));
  133 + builder.addLimit(Bandwidth.simple(pollRecordsPerMinute, Duration.ofMinutes(1)));
  134 + LocalBucket pollRateBucket = builder.build();
  135 + BlockingBucket blockingPollRateBucket = pollRateBucket.asScheduler();
  136 +
117 137 mainConsumerExecutor.execute(() -> {
118 138 while (!stopped) {
119 139 try {
120 140 ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
  141 + int recordsCount = records.count();
  142 + if (recordsCount > 0) {
  143 + while (!blockingPollRateBucket.tryConsume(recordsCount, TimeUnit.SECONDS.toNanos(5))) {
  144 + log.info("Rule Engine consumer is busy. Required tokens: [{}]. Available tokens: [{}].", recordsCount, pollRateBucket.getAvailableTokens());
  145 + }
  146 + log.trace("Processing {} records", recordsCount);
  147 + }
121 148 records.forEach(record -> {
122 149 try {
123 150 ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
... ...
... ... @@ -393,6 +393,9 @@ transport:
393 393 topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
394 394 poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
395 395 auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
  396 + poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
  397 + max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
  398 + max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:120000}"
396 399 notifications:
397 400 topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
398 401 sessions:
... ...
... ... @@ -46,7 +46,8 @@ public class TBKafkaConsumerTemplate<T> {
46 46 private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
47 47 TbKafkaRequestIdExtractor<T> requestIdExtractor,
48 48 String clientId, String groupId, String topic,
49   - boolean autoCommit, int autoCommitIntervalMs) {
  49 + boolean autoCommit, int autoCommitIntervalMs,
  50 + long maxPollRecords) {
50 51 Properties props = settings.toProps();
51 52 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
52 53 if (groupId != null) {
... ... @@ -56,6 +57,7 @@ public class TBKafkaConsumerTemplate<T> {
56 57 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
57 58 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
58 59 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  60 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
59 61 this.consumer = new KafkaConsumer<>(props);
60 62 this.decoder = decoder;
61 63 this.requestIdExtractor = requestIdExtractor;
... ...