Commit e318b193bd604eaa1e124d3f44f438838e8b20a6
Committed by
GitHub
1 parent
52bed174
Develop/2.5.3 confluent cloud (#3259)
* added other parameters for queue kafka * Added support Confluent Cloud * fix js executor kafka connection * refactored
Showing
11 changed files
with
166 additions
and
20 deletions
... | ... | @@ -605,16 +605,13 @@ queue: |
605 | 605 | max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" |
606 | 606 | max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" |
607 | 607 | fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" |
608 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | |
609 | + confluent: | |
610 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | |
611 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | |
612 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | |
613 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | |
608 | 614 | other: |
609 | -# Properties for Confluent cloud | |
610 | -# - key: "ssl.endpoint.identification.algorithm" | |
611 | -# value: "https" | |
612 | -# - key: "sasl.mechanism" | |
613 | -# value: "PLAIN" | |
614 | -# - key: "sasl.jaas.config" | |
615 | -# value: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";" | |
616 | -# - key: "security.protocol" | |
617 | -# value: "SASL_SSL" | |
618 | 615 | topic-properties: |
619 | 616 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
620 | 617 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | ... | ... |
... | ... | @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.kafka; |
18 | 18 | import lombok.Getter; |
19 | 19 | import lombok.Setter; |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | +import org.apache.kafka.clients.CommonClientConfigs; | |
21 | 22 | import org.apache.kafka.clients.producer.ProducerConfig; |
22 | 23 | import org.springframework.beans.factory.annotation.Value; |
23 | 24 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
... | ... | @@ -68,7 +69,22 @@ public class TbKafkaSettings { |
68 | 69 | |
69 | 70 | @Value("${queue.kafka.fetch_max_bytes:134217728}") |
70 | 71 | @Getter |
71 | - private int fetchMaxBytes; | |
72 | + private int fetchMaxBytes; | |
73 | + | |
74 | + @Value("${queue.kafka.use_confluent_cloud:false}") | |
75 | + private boolean useConfluent; | |
76 | + | |
77 | + @Value("${queue.kafka.confluent.ssl.algorithm}") | |
78 | + private String sslAlgorithm; | |
79 | + | |
80 | + @Value("${queue.kafka.confluent.sasl.mechanism}") | |
81 | + private String saslMechanism; | |
82 | + | |
83 | + @Value("${queue.kafka.confluent.sasl.config}") | |
84 | + private String saslConfig; | |
85 | + | |
86 | + @Value("${queue.kafka.confluent.security.protocol}") | |
87 | + private String securityProtocol; | |
72 | 88 | |
73 | 89 | @Setter |
74 | 90 | private List<TbKafkaProperty> other; |
... | ... | @@ -76,12 +92,21 @@ public class TbKafkaSettings { |
76 | 92 | public Properties toProps() { |
77 | 93 | Properties props = new Properties(); |
78 | 94 | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |
79 | - props.put(ProducerConfig.ACKS_CONFIG, acks); | |
80 | 95 | props.put(ProducerConfig.RETRIES_CONFIG, retries); |
81 | - props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); | |
82 | - props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); | |
83 | - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | |
84 | - if(other != null){ | |
96 | + | |
97 | + if (useConfluent) { | |
98 | + props.put("ssl.endpoint.identification.algorithm", sslAlgorithm); | |
99 | + props.put("sasl.mechanism", saslMechanism); | |
100 | + props.put("sasl.jaas.config", saslConfig); | |
101 | + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); | |
102 | + } else { | |
103 | + props.put(ProducerConfig.ACKS_CONFIG, acks); | |
104 | + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); | |
105 | + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); | |
106 | + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | |
107 | + } | |
108 | + | |
109 | + if (other != null) { | |
85 | 110 | other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); |
86 | 111 | } |
87 | 112 | return props; | ... | ... |
... | ... | @@ -39,6 +39,9 @@ function additionalComposeQueueArgs() { |
39 | 39 | kafka) |
40 | 40 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.kafka.yml" |
41 | 41 | ;; |
42 | + confluent) | |
43 | + ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.confluent.yml" | |
44 | + ;; | |
42 | 45 | aws-sqs) |
43 | 46 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.aws-sqs.yml" |
44 | 47 | ;; |
... | ... | @@ -52,7 +55,7 @@ function additionalComposeQueueArgs() { |
52 | 55 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.service-bus.yml" |
53 | 56 | ;; |
54 | 57 | *) |
55 | - echo "Unknown Queue service value specified: '${TB_QUEUE_TYPE}'. Should be either kafka or aws-sqs or pubsub or rabbitmq or service-bus." >&2 | |
58 | + echo "Unknown Queue service value specified: '${TB_QUEUE_TYPE}'. Should be either kafka or confluent or aws-sqs or pubsub or rabbitmq or service-bus." >&2 | |
56 | 59 | exit 1 |
57 | 60 | esac |
58 | 61 | echo $ADDITIONAL_COMPOSE_QUEUE_ARGS | ... | ... |
docker/docker-compose.confluent.yml
0 → 100644
1 | +# | |
2 | +# Copyright © 2016-2020 The Thingsboard Authors | |
3 | +# | |
4 | +# Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | +# you may not use this file except in compliance with the License. | |
6 | +# You may obtain a copy of the License at | |
7 | +# | |
8 | +# http://www.apache.org/licenses/LICENSE-2.0 | |
9 | +# | |
10 | +# Unless required by applicable law or agreed to in writing, software | |
11 | +# distributed under the License is distributed on an "AS IS" BASIS, | |
12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | +# See the License for the specific language governing permissions and | |
14 | +# limitations under the License. | |
15 | +# | |
16 | + | |
17 | +version: '2.2' | |
18 | + | |
19 | +services: | |
20 | + tb-js-executor: | |
21 | + env_file: | |
22 | + - queue-confluent.env | |
23 | + tb-core1: | |
24 | + env_file: | |
25 | + - queue-confluent.env | |
26 | + depends_on: | |
27 | + - redis | |
28 | + tb-core2: | |
29 | + env_file: | |
30 | + - queue-confluent.env | |
31 | + depends_on: | |
32 | + - redis | |
33 | + tb-rule-engine1: | |
34 | + env_file: | |
35 | + - queue-confluent.env | |
36 | + depends_on: | |
37 | + - redis | |
38 | + tb-rule-engine2: | |
39 | + env_file: | |
40 | + - queue-confluent.env | |
41 | + depends_on: | |
42 | + - redis | |
43 | + tb-mqtt-transport1: | |
44 | + env_file: | |
45 | + - queue-confluent.env | |
46 | + tb-mqtt-transport2: | |
47 | + env_file: | |
48 | + - queue-confluent.env | |
49 | + tb-http-transport1: | |
50 | + env_file: | |
51 | + - queue-confluent.env | |
52 | + tb-http-transport2: | |
53 | + env_file: | |
54 | + - queue-confluent.env | |
55 | + tb-coap-transport: | |
56 | + env_file: | |
57 | + - queue-confluent.env | ... | ... |
docker/queue-confluent.env
0 → 100644
1 | +TB_QUEUE_TYPE=kafka | |
2 | + | |
3 | +TB_KAFKA_SERVERS=confluent.cloud:9092 | |
4 | +TB_QUEUE_KAFKA_REPLICATION_FACTOR=3 | |
5 | + | |
6 | +TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD=true | |
7 | +TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM=https | |
8 | +TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM=PLAIN | |
9 | +TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; | |
10 | +TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL=SASL_SSL | |
11 | +TB_QUEUE_KAFKA_CONFLUENT_USERNAME=CLUSTER_API_KEY | |
12 | +TB_QUEUE_KAFKA_CONFLUENT_PASSWORD=CLUSTER_API_SECRET | |
13 | + | |
14 | +TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | |
15 | +TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | |
16 | +TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | |
17 | +TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | |
18 | +TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600 | ... | ... |
... | ... | @@ -26,6 +26,12 @@ kafka: |
26 | 26 | servers: "TB_KAFKA_SERVERS" |
27 | 27 | replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" |
28 | 28 | topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" |
29 | + use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" | |
30 | + confluent: | |
31 | + sasl: | |
32 | + mechanism: "TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM" | |
33 | + username: "TB_QUEUE_KAFKA_CONFLUENT_USERNAME" | |
34 | + password: "TB_QUEUE_KAFKA_CONFLUENT_PASSWORD" | |
29 | 35 | |
30 | 36 | pubsub: |
31 | 37 | project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" | ... | ... |
... | ... | @@ -26,6 +26,10 @@ kafka: |
26 | 26 | servers: "localhost:9092" |
27 | 27 | replication_factor: "1" |
28 | 28 | topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" |
29 | + use_confluent_cloud: false | |
30 | + confluent: | |
31 | + sasl: | |
32 | + mechanism: "PLAIN" | |
29 | 33 | |
30 | 34 | pubsub: |
31 | 35 | queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" | ... | ... |
... | ... | @@ -61,15 +61,27 @@ function KafkaProducer() { |
61 | 61 | |
62 | 62 | const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); |
63 | 63 | const requestTopic = config.get('request_topic'); |
64 | + const useConfluent = config.get('kafka.use_confluent_cloud'); | |
64 | 65 | |
65 | 66 | logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
66 | 67 | logger.info('Kafka Requests Topic: %s', requestTopic); |
67 | 68 | |
68 | - kafkaClient = new Kafka({ | |
69 | + let kafkaConfig = { | |
69 | 70 | brokers: kafkaBootstrapServers.split(','), |
70 | - logLevel: logLevel.INFO, | |
71 | - logCreator: KafkaJsWinstonLogCreator | |
72 | - }); | |
71 | + logLevel: logLevel.INFO, | |
72 | + logCreator: KafkaJsWinstonLogCreator | |
73 | + }; | |
74 | + | |
75 | + if (useConfluent) { | |
76 | + kafkaConfig['sasl'] = { | |
77 | + mechanism: config.get('kafka.confluent.sasl.mechanism'), | |
78 | + username: config.get('kafka.confluent.username'), | |
79 | + password: config.get('kafka.confluent.password') | |
80 | + }; | |
81 | + kafkaConfig['ssl'] = true; | |
82 | + } | |
83 | + | |
84 | + kafkaClient = new Kafka(kafkaConfig); | |
73 | 85 | |
74 | 86 | parseTopicProperties(); |
75 | 87 | ... | ... |
... | ... | @@ -69,6 +69,13 @@ queue: |
69 | 69 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
70 | 70 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
71 | 71 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
72 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | |
73 | + confluent: | |
74 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | |
75 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | |
76 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | |
77 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | |
78 | + other: | |
72 | 79 | topic-properties: |
73 | 80 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
74 | 81 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
... | ... | @@ -76,6 +83,7 @@ queue: |
76 | 83 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
77 | 84 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
78 | 85 | aws_sqs: |
86 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | |
79 | 87 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
80 | 88 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
81 | 89 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | ... | ... |
... | ... | @@ -62,6 +62,13 @@ queue: |
62 | 62 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
63 | 63 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
64 | 64 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
65 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | |
66 | + confluent: | |
67 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | |
68 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | |
69 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | |
70 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | |
71 | + other: | |
65 | 72 | topic-properties: |
66 | 73 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
67 | 74 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
... | ... | @@ -69,6 +76,7 @@ queue: |
69 | 76 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
70 | 77 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
71 | 78 | aws_sqs: |
79 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | |
72 | 80 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
73 | 81 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
74 | 82 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | ... | ... |
... | ... | @@ -90,6 +90,13 @@ queue: |
90 | 90 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
91 | 91 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
92 | 92 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
93 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | |
94 | + confluent: | |
95 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | |
96 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | |
97 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | |
98 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | |
99 | + other: | |
93 | 100 | topic-properties: |
94 | 101 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
95 | 102 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
... | ... | @@ -97,6 +104,7 @@ queue: |
97 | 104 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
98 | 105 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
99 | 106 | aws_sqs: |
107 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | |
100 | 108 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
101 | 109 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
102 | 110 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | ... | ... |