Commit 1d24a08c17df3788daebb4b2b67efa52ea6b07c6
Committed by
Andrew Shvayka
1 parent
c8488aa5
kafka: added TB_KAFKA_COMPRESSION_TYPE for producer: none (default), gzip.
Showing
2 changed files
with
5 additions
and
0 deletions
@@ -730,6 +730,7 @@ queue: | @@ -730,6 +730,7 @@ queue: | ||
730 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" | 730 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
731 | acks: "${TB_KAFKA_ACKS:all}" | 731 | acks: "${TB_KAFKA_ACKS:all}" |
732 | retries: "${TB_KAFKA_RETRIES:1}" | 732 | retries: "${TB_KAFKA_RETRIES:1}" |
733 | + compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip | ||
733 | batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" | 734 | batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" |
734 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" | 735 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
735 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" | 736 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
@@ -54,6 +54,9 @@ public class TbKafkaSettings { | @@ -54,6 +54,9 @@ public class TbKafkaSettings { | ||
54 | @Value("${queue.kafka.retries}") | 54 | @Value("${queue.kafka.retries}") |
55 | private int retries; | 55 | private int retries; |
56 | 56 | ||
57 | + @Value("${queue.kafka.compression.type:none}") | ||
58 | + private String compressionType; | ||
59 | + | ||
57 | @Value("${queue.kafka.batch.size}") | 60 | @Value("${queue.kafka.batch.size}") |
58 | private int batchSize; | 61 | private int batchSize; |
59 | 62 | ||
@@ -135,6 +138,7 @@ public class TbKafkaSettings { | @@ -135,6 +138,7 @@ public class TbKafkaSettings { | ||
135 | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | 138 | props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); |
136 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | 139 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); |
137 | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | 140 | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
141 | + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); | ||
138 | return props; | 142 | return props; |
139 | } | 143 | } |
140 | 144 |