Showing
14 changed files
with
189 additions
and
30 deletions
@@ -619,16 +619,13 @@ queue: | @@ -619,16 +619,13 @@ queue: | ||
619 | max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" | 619 | max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" |
620 | max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" | 620 | max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" |
621 | fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" | 621 | fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" |
622 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | ||
623 | + confluent: | ||
624 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | ||
625 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | ||
626 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | ||
627 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | ||
622 | other: | 628 | other: |
623 | -# Properties for Confluent cloud | ||
624 | -# - key: "ssl.endpoint.identification.algorithm" | ||
625 | -# value: "https" | ||
626 | -# - key: "sasl.mechanism" | ||
627 | -# value: "PLAIN" | ||
628 | -# - key: "sasl.jaas.config" | ||
629 | -# value: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";" | ||
630 | -# - key: "security.protocol" | ||
631 | -# value: "SASL_SSL" | ||
632 | topic-properties: | 629 | topic-properties: |
633 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 630 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
634 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 631 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
@@ -18,6 +18,7 @@ package org.thingsboard.server.queue.kafka; | @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.kafka; | ||
18 | import lombok.Getter; | 18 | import lombok.Getter; |
19 | import lombok.Setter; | 19 | import lombok.Setter; |
20 | import lombok.extern.slf4j.Slf4j; | 20 | import lombok.extern.slf4j.Slf4j; |
21 | +import org.apache.kafka.clients.CommonClientConfigs; | ||
21 | import org.apache.kafka.clients.producer.ProducerConfig; | 22 | import org.apache.kafka.clients.producer.ProducerConfig; |
22 | import org.springframework.beans.factory.annotation.Value; | 23 | import org.springframework.beans.factory.annotation.Value; |
23 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | 24 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
@@ -68,7 +69,22 @@ public class TbKafkaSettings { | @@ -68,7 +69,22 @@ public class TbKafkaSettings { | ||
68 | 69 | ||
69 | @Value("${queue.kafka.fetch_max_bytes:134217728}") | 70 | @Value("${queue.kafka.fetch_max_bytes:134217728}") |
70 | @Getter | 71 | @Getter |
71 | - private int fetchMaxBytes; | 72 | + private int fetchMaxBytes; |
73 | + | ||
74 | + @Value("${queue.kafka.use_confluent_cloud:false}") | ||
75 | + private boolean useConfluent; | ||
76 | + | ||
77 | + @Value("${queue.kafka.confluent.ssl.algorithm}") | ||
78 | + private String sslAlgorithm; | ||
79 | + | ||
80 | + @Value("${queue.kafka.confluent.sasl.mechanism}") | ||
81 | + private String saslMechanism; | ||
82 | + | ||
83 | + @Value("${queue.kafka.confluent.sasl.config}") | ||
84 | + private String saslConfig; | ||
85 | + | ||
86 | + @Value("${queue.kafka.confluent.security.protocol}") | ||
87 | + private String securityProtocol; | ||
72 | 88 | ||
73 | @Setter | 89 | @Setter |
74 | private List<TbKafkaProperty> other; | 90 | private List<TbKafkaProperty> other; |
@@ -76,12 +92,21 @@ public class TbKafkaSettings { | @@ -76,12 +92,21 @@ public class TbKafkaSettings { | ||
76 | public Properties toProps() { | 92 | public Properties toProps() { |
77 | Properties props = new Properties(); | 93 | Properties props = new Properties(); |
78 | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); | 94 | props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |
79 | - props.put(ProducerConfig.ACKS_CONFIG, acks); | ||
80 | props.put(ProducerConfig.RETRIES_CONFIG, retries); | 95 | props.put(ProducerConfig.RETRIES_CONFIG, retries); |
81 | - props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); | ||
82 | - props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); | ||
83 | - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | ||
84 | - if(other != null){ | 96 | + |
97 | + if (useConfluent) { | ||
98 | + props.put("ssl.endpoint.identification.algorithm", sslAlgorithm); | ||
99 | + props.put("sasl.mechanism", saslMechanism); | ||
100 | + props.put("sasl.jaas.config", saslConfig); | ||
101 | + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); | ||
102 | + } else { | ||
103 | + props.put(ProducerConfig.ACKS_CONFIG, acks); | ||
104 | + props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); | ||
105 | + props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); | ||
106 | + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | ||
107 | + } | ||
108 | + | ||
109 | + if (other != null) { | ||
85 | other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); | 110 | other.forEach(kv -> props.put(kv.getKey(), kv.getValue())); |
86 | } | 111 | } |
87 | return props; | 112 | return props; |
@@ -39,6 +39,9 @@ function additionalComposeQueueArgs() { | @@ -39,6 +39,9 @@ function additionalComposeQueueArgs() { | ||
39 | kafka) | 39 | kafka) |
40 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.kafka.yml" | 40 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.kafka.yml" |
41 | ;; | 41 | ;; |
42 | + confluent) | ||
43 | + ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.confluent.yml" | ||
44 | + ;; | ||
42 | aws-sqs) | 45 | aws-sqs) |
43 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.aws-sqs.yml" | 46 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.aws-sqs.yml" |
44 | ;; | 47 | ;; |
@@ -52,7 +55,7 @@ function additionalComposeQueueArgs() { | @@ -52,7 +55,7 @@ function additionalComposeQueueArgs() { | ||
52 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.service-bus.yml" | 55 | ADDITIONAL_COMPOSE_QUEUE_ARGS="-f docker-compose.service-bus.yml" |
53 | ;; | 56 | ;; |
54 | *) | 57 | *) |
55 | - echo "Unknown Queue service value specified: '${TB_QUEUE_TYPE}'. Should be either kafka or aws-sqs or pubsub or rabbitmq or service-bus." >&2 | 58 | + echo "Unknown Queue service value specified: '${TB_QUEUE_TYPE}'. Should be either kafka or confluent or aws-sqs or pubsub or rabbitmq or service-bus." >&2 |
56 | exit 1 | 59 | exit 1 |
57 | esac | 60 | esac |
58 | echo $ADDITIONAL_COMPOSE_QUEUE_ARGS | 61 | echo $ADDITIONAL_COMPOSE_QUEUE_ARGS |
docker/docker-compose.confluent.yml
0 → 100644
1 | +# | ||
2 | +# Copyright © 2016-2020 The Thingsboard Authors | ||
3 | +# | ||
4 | +# Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | +# you may not use this file except in compliance with the License. | ||
6 | +# You may obtain a copy of the License at | ||
7 | +# | ||
8 | +# http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | +# | ||
10 | +# Unless required by applicable law or agreed to in writing, software | ||
11 | +# distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | +# See the License for the specific language governing permissions and | ||
14 | +# limitations under the License. | ||
15 | +# | ||
16 | + | ||
17 | +version: '2.2' | ||
18 | + | ||
19 | +services: | ||
20 | + tb-js-executor: | ||
21 | + env_file: | ||
22 | + - queue-confluent.env | ||
23 | + tb-core1: | ||
24 | + env_file: | ||
25 | + - queue-confluent.env | ||
26 | + depends_on: | ||
27 | + - redis | ||
28 | + tb-core2: | ||
29 | + env_file: | ||
30 | + - queue-confluent.env | ||
31 | + depends_on: | ||
32 | + - redis | ||
33 | + tb-rule-engine1: | ||
34 | + env_file: | ||
35 | + - queue-confluent.env | ||
36 | + depends_on: | ||
37 | + - redis | ||
38 | + tb-rule-engine2: | ||
39 | + env_file: | ||
40 | + - queue-confluent.env | ||
41 | + depends_on: | ||
42 | + - redis | ||
43 | + tb-mqtt-transport1: | ||
44 | + env_file: | ||
45 | + - queue-confluent.env | ||
46 | + tb-mqtt-transport2: | ||
47 | + env_file: | ||
48 | + - queue-confluent.env | ||
49 | + tb-http-transport1: | ||
50 | + env_file: | ||
51 | + - queue-confluent.env | ||
52 | + tb-http-transport2: | ||
53 | + env_file: | ||
54 | + - queue-confluent.env | ||
55 | + tb-coap-transport: | ||
56 | + env_file: | ||
57 | + - queue-confluent.env |
docker/queue-confluent.env
0 → 100644
1 | +TB_QUEUE_TYPE=kafka | ||
2 | + | ||
3 | +TB_KAFKA_SERVERS=confluent.cloud:9092 | ||
4 | +TB_QUEUE_KAFKA_REPLICATION_FACTOR=3 | ||
5 | + | ||
6 | +TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD=true | ||
7 | +TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM=https | ||
8 | +TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM=PLAIN | ||
9 | +TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; | ||
10 | +TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL=SASL_SSL | ||
11 | +TB_QUEUE_KAFKA_CONFLUENT_USERNAME=CLUSTER_API_KEY | ||
12 | +TB_QUEUE_KAFKA_CONFLUENT_PASSWORD=CLUSTER_API_SECRET | ||
13 | + | ||
14 | +TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | ||
15 | +TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | ||
16 | +TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | ||
17 | +TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000 | ||
18 | +TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:52428800;retention.bytes:104857600 |
@@ -26,6 +26,12 @@ kafka: | @@ -26,6 +26,12 @@ kafka: | ||
26 | servers: "TB_KAFKA_SERVERS" | 26 | servers: "TB_KAFKA_SERVERS" |
27 | replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" | 27 | replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" |
28 | topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" | 28 | topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" |
29 | + use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD" | ||
30 | + confluent: | ||
31 | + sasl: | ||
32 | + mechanism: "TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM" | ||
33 | + username: "TB_QUEUE_KAFKA_CONFLUENT_USERNAME" | ||
34 | + password: "TB_QUEUE_KAFKA_CONFLUENT_PASSWORD" | ||
29 | 35 | ||
30 | pubsub: | 36 | pubsub: |
31 | project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" | 37 | project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" |
@@ -26,6 +26,10 @@ kafka: | @@ -26,6 +26,10 @@ kafka: | ||
26 | servers: "localhost:9092" | 26 | servers: "localhost:9092" |
27 | replication_factor: "1" | 27 | replication_factor: "1" |
28 | topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" | 28 | topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" |
29 | + use_confluent_cloud: false | ||
30 | + confluent: | ||
31 | + sasl: | ||
32 | + mechanism: "PLAIN" | ||
29 | 33 | ||
30 | pubsub: | 34 | pubsub: |
31 | queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" | 35 | queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" |
@@ -61,15 +61,27 @@ function KafkaProducer() { | @@ -61,15 +61,27 @@ function KafkaProducer() { | ||
61 | 61 | ||
62 | const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); | 62 | const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); |
63 | const requestTopic = config.get('request_topic'); | 63 | const requestTopic = config.get('request_topic'); |
64 | + const useConfluent = config.get('kafka.use_confluent_cloud'); | ||
64 | 65 | ||
65 | logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); | 66 | logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
66 | logger.info('Kafka Requests Topic: %s', requestTopic); | 67 | logger.info('Kafka Requests Topic: %s', requestTopic); |
67 | 68 | ||
68 | - kafkaClient = new Kafka({ | 69 | + let kafkaConfig = { |
69 | brokers: kafkaBootstrapServers.split(','), | 70 | brokers: kafkaBootstrapServers.split(','), |
70 | - logLevel: logLevel.INFO, | ||
71 | - logCreator: KafkaJsWinstonLogCreator | ||
72 | - }); | 71 | + logLevel: logLevel.INFO, |
72 | + logCreator: KafkaJsWinstonLogCreator | ||
73 | + }; | ||
74 | + | ||
75 | + if (useConfluent) { | ||
76 | + kafkaConfig['sasl'] = { | ||
77 | + mechanism: config.get('kafka.confluent.sasl.mechanism'), | ||
78 | + username: config.get('kafka.confluent.username'), | ||
79 | + password: config.get('kafka.confluent.password') | ||
80 | + }; | ||
81 | + kafkaConfig['ssl'] = true; | ||
82 | + } | ||
83 | + | ||
84 | + kafkaClient = new Kafka(kafkaConfig); | ||
73 | 85 | ||
74 | parseTopicProperties(); | 86 | parseTopicProperties(); |
75 | 87 |
@@ -17,11 +17,15 @@ package org.thingsboard.rule.engine.api.util; | @@ -17,11 +17,15 @@ package org.thingsboard.rule.engine.api.util; | ||
17 | 17 | ||
18 | import com.fasterxml.jackson.core.JsonProcessingException; | 18 | import com.fasterxml.jackson.core.JsonProcessingException; |
19 | import com.fasterxml.jackson.databind.ObjectMapper; | 19 | import com.fasterxml.jackson.databind.ObjectMapper; |
20 | +import org.springframework.util.CollectionUtils; | ||
20 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; | 21 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
21 | import org.thingsboard.rule.engine.api.TbNodeException; | 22 | import org.thingsboard.rule.engine.api.TbNodeException; |
22 | import org.thingsboard.server.common.msg.TbMsgMetaData; | 23 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
23 | 24 | ||
25 | +import java.util.Collections; | ||
26 | +import java.util.List; | ||
24 | import java.util.Map; | 27 | import java.util.Map; |
28 | +import java.util.stream.Collectors; | ||
25 | 29 | ||
26 | /** | 30 | /** |
27 | * Created by ashvayka on 19.01.18. | 31 | * Created by ashvayka on 19.01.18. |
@@ -41,6 +45,13 @@ public class TbNodeUtils { | @@ -41,6 +45,13 @@ public class TbNodeUtils { | ||
41 | } | 45 | } |
42 | } | 46 | } |
43 | 47 | ||
48 | + public static List<String> processPatterns(List<String> patterns, TbMsgMetaData metaData) { | ||
49 | + if (!CollectionUtils.isEmpty(patterns)) { | ||
50 | + return patterns.stream().map(p -> processPattern(p, metaData)).collect(Collectors.toList()); | ||
51 | + } | ||
52 | + return Collections.emptyList(); | ||
53 | + } | ||
54 | + | ||
44 | public static String processPattern(String pattern, TbMsgMetaData metaData) { | 55 | public static String processPattern(String pattern, TbMsgMetaData metaData) { |
45 | String result = new String(pattern); | 56 | String result = new String(pattern); |
46 | for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) { | 57 | for (Map.Entry<String,String> keyVal : metaData.values().entrySet()) { |
@@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; | @@ -29,6 +29,7 @@ import org.thingsboard.rule.engine.api.TbContext; | ||
29 | import org.thingsboard.rule.engine.api.TbNode; | 29 | import org.thingsboard.rule.engine.api.TbNode; |
30 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; | 30 | import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
31 | import org.thingsboard.rule.engine.api.TbNodeException; | 31 | import org.thingsboard.rule.engine.api.TbNodeException; |
32 | +import org.thingsboard.rule.engine.api.util.TbNodeUtils; | ||
32 | import org.thingsboard.server.common.data.id.EntityId; | 33 | import org.thingsboard.server.common.data.id.EntityId; |
33 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; | 34 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
34 | import org.thingsboard.server.common.data.kv.KvEntry; | 35 | import org.thingsboard.server.common.data.kv.KvEntry; |
@@ -91,10 +92,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC | @@ -91,10 +92,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC | ||
91 | } | 92 | } |
92 | ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>(); | 93 | ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>(); |
93 | ListenableFuture<List<Void>> allFutures = Futures.allAsList( | 94 | ListenableFuture<List<Void>> allFutures = Futures.allAsList( |
94 | - putLatestTelemetry(ctx, entityId, msg, LATEST_TS, config.getLatestTsKeyNames(), failuresMap), | ||
95 | - putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, config.getClientAttributeNames(), failuresMap, "cs_"), | ||
96 | - putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, config.getSharedAttributeNames(), failuresMap, "shared_"), | ||
97 | - putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, config.getServerAttributeNames(), failuresMap, "ss_") | 95 | + putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg.getMetaData()), failuresMap), |
96 | + putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg.getMetaData()), failuresMap, "cs_"), | ||
97 | + putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg.getMetaData()), failuresMap, "shared_"), | ||
98 | + putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg.getMetaData()), failuresMap, "ss_") | ||
98 | ); | 99 | ); |
99 | withCallback(allFutures, i -> { | 100 | withCallback(allFutures, i -> { |
100 | if (!failuresMap.isEmpty()) { | 101 | if (!failuresMap.isEmpty()) { |
@@ -103,9 +103,10 @@ public class TbGetTelemetryNode implements TbNode { | @@ -103,9 +103,10 @@ public class TbGetTelemetryNode implements TbNode { | ||
103 | if (config.isUseMetadataIntervalPatterns()) { | 103 | if (config.isUseMetadataIntervalPatterns()) { |
104 | checkMetadataKeyPatterns(msg); | 104 | checkMetadataKeyPatterns(msg); |
105 | } | 105 | } |
106 | - ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg)); | 106 | + List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg.getMetaData()); |
107 | + ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys)); | ||
107 | DonAsynchron.withCallback(list, data -> { | 108 | DonAsynchron.withCallback(list, data -> { |
108 | - process(data, msg); | 109 | + process(data, msg, keys); |
109 | ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData())); | 110 | ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData())); |
110 | }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); | 111 | }, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); |
111 | } catch (Exception e) { | 112 | } catch (Exception e) { |
@@ -118,8 +119,8 @@ public class TbGetTelemetryNode implements TbNode { | @@ -118,8 +119,8 @@ public class TbGetTelemetryNode implements TbNode { | ||
118 | public void destroy() { | 119 | public void destroy() { |
119 | } | 120 | } |
120 | 121 | ||
121 | - private List<ReadTsKvQuery> buildQueries(TbMsg msg) { | ||
122 | - return tsKeyNames.stream() | 122 | + private List<ReadTsKvQuery> buildQueries(TbMsg msg, List<String> keys) { |
123 | + return keys.stream() | ||
123 | .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) | 124 | .map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, getOrderBy())) |
124 | .collect(Collectors.toList()); | 125 | .collect(Collectors.toList()); |
125 | } | 126 | } |
@@ -135,7 +136,7 @@ public class TbGetTelemetryNode implements TbNode { | @@ -135,7 +136,7 @@ public class TbGetTelemetryNode implements TbNode { | ||
135 | } | 136 | } |
136 | } | 137 | } |
137 | 138 | ||
138 | - private void process(List<TsKvEntry> entries, TbMsg msg) { | 139 | + private void process(List<TsKvEntry> entries, TbMsg msg, List<String> keys) { |
139 | ObjectNode resultNode = mapper.createObjectNode(); | 140 | ObjectNode resultNode = mapper.createObjectNode(); |
140 | if (FETCH_MODE_ALL.equals(fetchMode)) { | 141 | if (FETCH_MODE_ALL.equals(fetchMode)) { |
141 | entries.forEach(entry -> processArray(resultNode, entry)); | 142 | entries.forEach(entry -> processArray(resultNode, entry)); |
@@ -143,7 +144,7 @@ public class TbGetTelemetryNode implements TbNode { | @@ -143,7 +144,7 @@ public class TbGetTelemetryNode implements TbNode { | ||
143 | entries.forEach(entry -> processSingle(resultNode, entry)); | 144 | entries.forEach(entry -> processSingle(resultNode, entry)); |
144 | } | 145 | } |
145 | 146 | ||
146 | - for (String key : tsKeyNames) { | 147 | + for (String key : keys) { |
147 | if (resultNode.has(key)) { | 148 | if (resultNode.has(key)) { |
148 | msg.getMetaData().putValue(key, resultNode.get(key).toString()); | 149 | msg.getMetaData().putValue(key, resultNode.get(key).toString()); |
149 | } | 150 | } |
@@ -69,6 +69,13 @@ queue: | @@ -69,6 +69,13 @@ queue: | ||
69 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" | 69 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
70 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" | 70 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
71 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" | 71 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
72 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | ||
73 | + confluent: | ||
74 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | ||
75 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | ||
76 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | ||
77 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | ||
78 | + other: | ||
72 | topic-properties: | 79 | topic-properties: |
73 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 80 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
74 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 81 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
@@ -76,6 +83,7 @@ queue: | @@ -76,6 +83,7 @@ queue: | ||
76 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 83 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
77 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" | 84 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
78 | aws_sqs: | 85 | aws_sqs: |
86 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | ||
79 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" | 87 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
80 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 88 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
81 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | 89 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
@@ -62,6 +62,13 @@ queue: | @@ -62,6 +62,13 @@ queue: | ||
62 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" | 62 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
63 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" | 63 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
64 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" | 64 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
65 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | ||
66 | + confluent: | ||
67 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | ||
68 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | ||
69 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | ||
70 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | ||
71 | + other: | ||
65 | topic-properties: | 72 | topic-properties: |
66 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 73 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
67 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 74 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
@@ -69,6 +76,7 @@ queue: | @@ -69,6 +76,7 @@ queue: | ||
69 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 76 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
70 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" | 77 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
71 | aws_sqs: | 78 | aws_sqs: |
79 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | ||
72 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" | 80 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
73 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 81 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
74 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | 82 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
@@ -90,6 +90,13 @@ queue: | @@ -90,6 +90,13 @@ queue: | ||
90 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" | 90 | linger.ms: "${TB_KAFKA_LINGER_MS:1}" |
91 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" | 91 | buffer.memory: "${TB_BUFFER_MEMORY:33554432}" |
92 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" | 92 | replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}" |
93 | + use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" | ||
94 | + confluent: | ||
95 | + ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" | ||
96 | + sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" | ||
97 | + sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" | ||
98 | + security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" | ||
99 | + other: | ||
93 | topic-properties: | 100 | topic-properties: |
94 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 101 | rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
95 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 102 | core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
@@ -97,6 +104,7 @@ queue: | @@ -97,6 +104,7 @@ queue: | ||
97 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" | 104 | notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" |
98 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" | 105 | js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" |
99 | aws_sqs: | 106 | aws_sqs: |
107 | + use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}" | ||
100 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" | 108 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
101 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 109 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
102 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | 110 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |