Commit 9a4a94621a88dd7e6e329dc1d7767ff75e311e16

Authored by Sergey Matvienko
1 parent 3a31b9c5

js-executor: parameter added for Kafka PARTITIONS_CONSUMED_CONCURRENTLY to decre…

…ase max latency while scale down replicas
@@ -29,6 +29,7 @@ kafka: @@ -29,6 +29,7 @@ kafka:
29 acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge 29 acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge
30 batch_size: "${TB_KAFKA_BATCH_SIZE:128}" # for producer 30 batch_size: "${TB_KAFKA_BATCH_SIZE:128}" # for producer
31 linger_ms: "${TB_KAFKA_LINGER_MS:1}" # for producer 31 linger_ms: "${TB_KAFKA_LINGER_MS:1}" # for producer
  32 + partitions_consumed_concurrently: "${PARTITIONS_CONSUMED_CONCURRENTLY:1}" # increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
32 requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS" 33 requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS"
33 compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed 34 compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed
34 topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" 35 topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
@@ -29,6 +29,7 @@ kafka: @@ -29,6 +29,7 @@ kafka:
29 acks: "1" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge 29 acks: "1" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge
30 batch_size: "128" # for producer 30 batch_size: "128" # for producer
31 linger_ms: "1" # for producer 31 linger_ms: "1" # for producer
  32 + partitions_consumed_concurrently: "1" # increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
32 requestTimeout: "30000" # The default value in kafkajs is: 30000 33 requestTimeout: "30000" # The default value in kafkajs is: 30000
33 compression: "gzip" # gzip or uncompressed 34 compression: "gzip" # gzip or uncompressed
34 topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" 35 topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1"
@@ -27,6 +27,7 @@ const maxBatchSize = Number(config.get('kafka.batch_size')); @@ -27,6 +27,7 @@ const maxBatchSize = Number(config.get('kafka.batch_size'));
27 const linger = Number(config.get('kafka.linger_ms')); 27 const linger = Number(config.get('kafka.linger_ms'));
28 const requestTimeout = Number(config.get('kafka.requestTimeout')); 28 const requestTimeout = Number(config.get('kafka.requestTimeout'));
29 const compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; 29 const compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
  30 +const partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently'));
30 31
31 let kafkaClient; 32 let kafkaClient;
32 let kafkaAdmin; 33 let kafkaAdmin;
@@ -197,7 +198,7 @@ async function sendMessagesAsBatch(isImmediately) { @@ -197,7 +198,7 @@ async function sendMessagesAsBatch(isImmediately) {
197 198
198 logger.info('Started ThingsBoard JavaScript Executor Microservice.'); 199 logger.info('Started ThingsBoard JavaScript Executor Microservice.');
199 await consumer.run({ 200 await consumer.run({
200 - //partitionsConsumedConcurrently: 1, // Default: 1 201 + partitionsConsumedConcurrently: partitionsConsumedConcurrently,
201 eachMessage: async ({topic, partition, message}) => { 202 eachMessage: async ({topic, partition, message}) => {
202 let headers = message.headers; 203 let headers = message.headers;
203 let key = message.key; 204 let key = message.key;