Commit 424d631a69c945387b7d16cec07cf9d407879303

Authored by Yevhen Bondarenko
Committed by GitHub
1 parent f2757e05

added extra properties to kafka consumer (#2751)

* added extra properties to kafka consumer

* added default values for kafka consumer properties in TbKafkaSettings

* Update TbKafkaSettings.java

* Update thingsboard.yml

Co-authored-by: Andrew Shvayka <ashvayka@thingsboard.io>
@@ -584,6 +584,9 @@ queue: @@ -584,6 +584,9 @@ queue:
584 linger.ms: "${TB_KAFKA_LINGER_MS:1}" 584 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
585 buffer.memory: "${TB_BUFFER_MEMORY:33554432}" 585 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
586 replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" 586 replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  587 + max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
  588 + max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
  589 + fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
587 topic-properties: 590 topic-properties:
588 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 591 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
589 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 592 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
@@ -54,6 +54,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue @@ -54,6 +54,9 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
54 if (groupId != null) { 54 if (groupId != null) {
55 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 55 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
56 } 56 }
  57 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, settings.getMaxPollRecords());
  58 + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, settings.getMaxPartitionFetchBytes());
  59 + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, settings.getFetchMaxBytes());
57 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); 60 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
58 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); 61 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
59 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 62 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@@ -55,6 +55,18 @@ public class TbKafkaSettings { @@ -55,6 +55,18 @@ public class TbKafkaSettings {
55 @Getter 55 @Getter
56 private short replicationFactor; 56 private short replicationFactor;
57 57
  58 + @Value("${queue.kafka.max_poll_records:8192}")
  59 + @Getter
  60 + private int maxPollRecords;
  61 +
  62 + @Value("${queue.kafka.max_partition_fetch_bytes:16777216}")
  63 + @Getter
  64 + private int maxPartitionFetchBytes;
  65 +
  66 + @Value("${queue.kafka.fetch_max_bytes:134217728}")
  67 + @Getter
  68 + private int fetchMaxBytes;
  69 +
58 @Value("${kafka.other:#{null}}") 70 @Value("${kafka.other:#{null}}")
59 private List<TbKafkaProperty> other; 71 private List<TbKafkaProperty> other;
60 72