Commit 86102aea06735fcd7f7f5d306a55817eb2b23d34

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent a8fe8b9e

added other parameters for queue kafka

@@ -605,6 +605,16 @@ queue: @@ -605,6 +605,16 @@ queue:
605 max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" 605 max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
606 max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" 606 max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
607 fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" 607 fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
  608 + other:
  609 +# Properties for Confluent cloud
  610 +# - key: "ssl.endpoint.identification.algorithm"
  611 +# value: "https"
  612 +# - key: "sasl.mechanism"
  613 +# value: "PLAIN"
  614 +# - key: "sasl.jaas.config"
  615 +# value: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";"
  616 +# - key: "security.protocol"
  617 +# value: "SASL_SSL"
608 topic-properties: 618 topic-properties:
609 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 619 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
610 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" 620 core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
@@ -16,10 +16,12 @@ @@ -16,10 +16,12 @@
16 package org.thingsboard.server.queue.kafka; 16 package org.thingsboard.server.queue.kafka;
17 17
18 import lombok.Getter; 18 import lombok.Getter;
  19 +import lombok.Setter;
19 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
20 import org.apache.kafka.clients.producer.ProducerConfig; 21 import org.apache.kafka.clients.producer.ProducerConfig;
21 import org.springframework.beans.factory.annotation.Value; 22 import org.springframework.beans.factory.annotation.Value;
22 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; 23 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  24 +import org.springframework.boot.context.properties.ConfigurationProperties;
23 import org.springframework.stereotype.Component; 25 import org.springframework.stereotype.Component;
24 26
25 import java.util.List; 27 import java.util.List;
@@ -30,6 +32,7 @@ import java.util.Properties; @@ -30,6 +32,7 @@ import java.util.Properties;
30 */ 32 */
31 @Slf4j 33 @Slf4j
32 @ConditionalOnExpression("'${queue.type:null}'=='kafka'") 34 @ConditionalOnExpression("'${queue.type:null}'=='kafka'")
  35 +@ConfigurationProperties(prefix = "queue.kafka")
33 @Component 36 @Component
34 public class TbKafkaSettings { 37 public class TbKafkaSettings {
35 38
@@ -67,7 +70,7 @@ public class TbKafkaSettings { @@ -67,7 +70,7 @@ public class TbKafkaSettings {
67 @Getter 70 @Getter
68 private int fetchMaxBytes; 71 private int fetchMaxBytes;
69 72
70 - @Value("${kafka.other:#{null}}") 73 + @Setter
71 private List<TbKafkaProperty> other; 74 private List<TbKafkaProperty> other;
72 75
73 public Properties toProps() { 76 public Properties toProps() {