Commit 01bf53b7f5a92214d6af98cad9f0cc6c09d09296
Committed by
Andrew Shvayka
1 parent
3236d3ce
remote js-executor: parameter added TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS
Showing
3 changed files
with
9 additions
and
2 deletions
@@ -26,6 +26,7 @@ kafka: | @@ -26,6 +26,7 @@ kafka: | ||
26 | servers: "TB_KAFKA_SERVERS" | 26 | servers: "TB_KAFKA_SERVERS" |
27 | replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" | 27 | replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" |
28 | acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge | 28 | acks: "TB_KAFKA_ACKS" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge |
29 | + requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS" | ||
29 | topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" | 30 | topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" |
30 | use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" | 31 | use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" |
31 | client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh | 32 | client_id: "KAFKA_CLIENT_ID" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh |
@@ -25,6 +25,8 @@ kafka: | @@ -25,6 +25,8 @@ kafka: | ||
25 | # Kafka Bootstrap Servers | 25 | # Kafka Bootstrap Servers |
26 | servers: "localhost:9092" | 26 | servers: "localhost:9092" |
27 | replication_factor: "1" | 27 | replication_factor: "1" |
28 | + acks: "1" # -1 = all; 0 = no acknowledgments; 1 = only waits for the leader to acknowledge | ||
29 | + requestTimeout: "30000" # The default value in kafkajs is: 30000 | ||
28 | topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" | 30 | topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1" |
29 | use_confluent_cloud: false | 31 | use_confluent_cloud: false |
30 | client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh | 32 | client_id: "kafkajs" #inject pod name to easy identify the client using /opt/kafka/bin/kafka-consumer-groups.sh |
@@ -19,10 +19,11 @@ const config = require('config'), | @@ -19,10 +19,11 @@ const config = require('config'), | ||
19 | JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), | 19 | JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
20 | logger = require('../config/logger')._logger('kafkaTemplate'), | 20 | logger = require('../config/logger')._logger('kafkaTemplate'), |
21 | KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; | 21 | KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; |
22 | -const replicationFactor = config.get('kafka.replication_factor'); | 22 | +const replicationFactor = Number(config.get('kafka.replication_factor')); |
23 | const topicProperties = config.get('kafka.topic_properties'); | 23 | const topicProperties = config.get('kafka.topic_properties'); |
24 | const kafkaClientId = config.get('kafka.client_id'); | 24 | const kafkaClientId = config.get('kafka.client_id'); |
25 | -const acks = config.get('kafka.acks'); | 25 | +const acks = Number(config.get('kafka.acks')); |
26 | +const requestTimeout = Number(config.get('kafka.requestTimeout')); | ||
26 | 27 | ||
27 | let kafkaClient; | 28 | let kafkaClient; |
28 | let kafkaAdmin; | 29 | let kafkaAdmin; |
@@ -72,6 +73,8 @@ function KafkaProducer() { | @@ -72,6 +73,8 @@ function KafkaProducer() { | ||
72 | logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); | 73 | logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); |
73 | } | 74 | } |
74 | 75 | ||
76 | + kafkaConfig['requestTimeout'] = requestTimeout; | ||
77 | + | ||
75 | if (useConfluent) { | 78 | if (useConfluent) { |
76 | kafkaConfig['sasl'] = { | 79 | kafkaConfig['sasl'] = { |
77 | mechanism: config.get('kafka.confluent.sasl.mechanism'), | 80 | mechanism: config.get('kafka.confluent.sasl.mechanism'), |
@@ -117,6 +120,7 @@ function KafkaProducer() { | @@ -117,6 +120,7 @@ function KafkaProducer() { | ||
117 | 120 | ||
118 | logger.info('Started ThingsBoard JavaScript Executor Microservice.'); | 121 | logger.info('Started ThingsBoard JavaScript Executor Microservice.'); |
119 | await consumer.run({ | 122 | await consumer.run({ |
123 | + //partitionsConsumedConcurrently: 1, // Default: 1 | ||
120 | eachMessage: async ({topic, partition, message}) => { | 124 | eachMessage: async ({topic, partition, message}) => { |
121 | let headers = message.headers; | 125 | let headers = message.headers; |
122 | let key = message.key; | 126 | let key = message.key; |