Commit d654e09d6a52c0a0d038ac176f36214c0dce25b2

Authored by Yevhen Bondarenko
Committed by GitHub
1 parent e1f914e2

kafka topic settings

* added topic-properties to kafka queue

* added topic-properties to kafka queue to transport

* kafka topic settings improvements
... ... @@ -521,6 +521,13 @@ queue:
521 521 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
522 522 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
523 523 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  524 + replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  525 + topic-properties:
  526 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  527 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  528 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  529 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  530 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
524 531 aws_sqs:
525 532 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
526 533 secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
... ...
... ... @@ -19,15 +19,14 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.apache.kafka.clients.admin.AdminClient;
20 20 import org.apache.kafka.clients.admin.CreateTopicsResult;
21 21 import org.apache.kafka.clients.admin.NewTopic;
22   -import org.apache.kafka.clients.admin.TopicDescription;
23   -import org.apache.kafka.common.KafkaFuture;
24 22 import org.apache.kafka.common.errors.TopicExistsException;
25 23 import org.thingsboard.server.queue.TbQueueAdmin;
26 24
27 25 import java.util.Collections;
  26 +import java.util.Map;
  27 +import java.util.Set;
  28 +import java.util.concurrent.ConcurrentHashMap;
28 29 import java.util.concurrent.ExecutionException;
29   -import java.util.concurrent.TimeUnit;
30   -import java.util.concurrent.TimeoutException;
31 30
32 31 /**
33 32 * Created by ashvayka on 24.09.18.
... ... @@ -35,17 +34,34 @@ import java.util.concurrent.TimeoutException;
35 34 @Slf4j
36 35 public class TBKafkaAdmin implements TbQueueAdmin {
37 36
38   - AdminClient client;
  37 + private final AdminClient client;
  38 + private final Map<String, String> topicConfigs;
  39 + private final Set<String> topics = ConcurrentHashMap.newKeySet();
39 40
40   - public TBKafkaAdmin(TbKafkaSettings settings) {
  41 + private final short replicationFactor;
  42 +
  43 + public TBKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) {
41 44 client = AdminClient.create(settings.toProps());
  45 + this.topicConfigs = topicConfigs;
  46 +
  47 + try {
  48 + topics.addAll(client.listTopics().names().get());
  49 + } catch (InterruptedException | ExecutionException e) {
  50 + log.error("Failed to get all topics.", e);
  51 + }
  52 +
  53 + replicationFactor = settings.getReplicationFactor();
42 54 }
43 55
44   - //TODO 2.5 - ybondarenko Need to pass not only settings but also properties for topic creation. Somewhere in thingsboard.yml, in KV format.
45 56 @Override
46 57 public void createTopicIfNotExists(String topic) {
  58 + if (topics.contains(topic)) {
  59 + return;
  60 + }
47 61 try {
48   - createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic).get();
  62 + NewTopic newTopic = new NewTopic(topic, 1, replicationFactor).configs(topicConfigs);
  63 + createTopic(newTopic).values().get(topic).get();
  64 + topics.add(topic);
49 65 } catch (ExecutionException ee) {
50 66 if (ee.getCause() instanceof TopicExistsException) {
51 67 //do nothing
... ... @@ -57,19 +73,10 @@ public class TBKafkaAdmin implements TbQueueAdmin {
57 73 log.warn("[{}] Failed to create topic", topic, e);
58 74 throw new RuntimeException(e);
59 75 }
  76 +
60 77 }
61 78
62 79 public CreateTopicsResult createTopic(NewTopic topic) {
63 80 return client.createTopics(Collections.singletonList(topic));
64 81 }
65   -
66   - private boolean topicExists(String topic) throws InterruptedException {
67   - KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic);
68   - try {
69   - topicDescriptionFuture.get();
70   - return true;
71   - } catch (ExecutionException e) {
72   - return false;
73   - }
74   - }
75 82 }
... ...
... ... @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
23 23 import org.apache.kafka.clients.consumer.ConsumerRecords;
24 24 import org.apache.kafka.clients.consumer.KafkaConsumer;
25 25 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  26 +import org.thingsboard.server.queue.TbQueueAdmin;
26 27 import org.thingsboard.server.queue.TbQueueConsumer;
27 28 import org.thingsboard.server.queue.TbQueueMsg;
28 29
... ... @@ -43,7 +44,7 @@ import java.util.stream.Collectors;
43 44 @Slf4j
44 45 public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
45 46
46   - private final TBKafkaAdmin admin;
  47 + private final TbQueueAdmin admin;
47 48 private final KafkaConsumer<String, byte[]> consumer;
48 49 private final TbKafkaDecoder<T> decoder;
49 50 private volatile boolean subscribed;
... ... @@ -57,7 +58,8 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
57 58 private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
58 59 String clientId, String groupId, String topic,
59 60 boolean autoCommit, int autoCommitIntervalMs,
60   - int maxPollRecords) {
  61 + int maxPollRecords,
  62 + TbQueueAdmin admin) {
61 63 Properties props = settings.toProps();
62 64 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
63 65 if (groupId != null) {
... ... @@ -70,7 +72,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
70 72 if (maxPollRecords > 0) {
71 73 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
72 74 }
73   - this.admin = new TBKafkaAdmin(settings);
  75 + this.admin = admin;
74 76 this.consumer = new KafkaConsumer<>(props);
75 77 this.decoder = decoder;
76 78 this.topic = topic;
... ...
... ... @@ -24,12 +24,15 @@ import org.apache.kafka.clients.producer.ProducerRecord;
24 24 import org.apache.kafka.common.header.Header;
25 25 import org.apache.kafka.common.header.internals.RecordHeader;
26 26 import org.springframework.util.StringUtils;
  27 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  28 +import org.thingsboard.server.queue.TbQueueAdmin;
27 29 import org.thingsboard.server.queue.TbQueueCallback;
28 30 import org.thingsboard.server.queue.TbQueueMsg;
29 31 import org.thingsboard.server.queue.TbQueueProducer;
30   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
31 32
32 33 import java.util.Properties;
  34 +import java.util.Set;
  35 +import java.util.concurrent.ConcurrentHashMap;
33 36 import java.util.stream.Collectors;
34 37
35 38 /**
... ... @@ -46,8 +49,12 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
46 49 @Getter
47 50 private final TbKafkaSettings settings;
48 51
  52 + private final TbQueueAdmin admin;
  53 +
  54 + private final Set<TopicPartitionInfo> topics;
  55 +
49 56 @Builder
50   - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId) {
  57 + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId, TbQueueAdmin admin) {
51 58 Properties props = settings.toProps();
52 59 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
53 60 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
... ... @@ -57,6 +64,8 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
57 64 this.settings = settings;
58 65 this.producer = new KafkaProducer<>(props);
59 66 this.defaultTopic = defaultTopic;
  67 + this.admin = admin;
  68 + topics = ConcurrentHashMap.newKeySet();
60 69 }
61 70
62 71 @Override
... ... @@ -65,6 +74,7 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
65 74
66 75 @Override
67 76 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  77 + createTopicIfNotExist(tpi);
68 78 String key = msg.getKey().toString();
69 79 byte[] data = msg.getData();
70 80 ProducerRecord<String, byte[]> record;
... ... @@ -85,8 +95,18 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro
85 95 });
86 96 }
87 97
  98 + private void createTopicIfNotExist(TopicPartitionInfo tpi) {
  99 + if (topics.contains(tpi)) {
  100 + return;
  101 + }
  102 + admin.createTopicIfNotExists(tpi.getFullTopicName());
  103 + topics.add(tpi);
  104 + }
  105 +
88 106 @Override
89 107 public void stop() {
90   -
  108 + if (producer != null) {
  109 + producer.close();
  110 + }
91 111 }
92 112 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.queue.kafka;
17 17
  18 +import lombok.Getter;
18 19 import lombok.extern.slf4j.Slf4j;
19 20 import org.apache.kafka.clients.producer.ProducerConfig;
20 21 import org.springframework.beans.factory.annotation.Value;
... ... @@ -32,9 +33,6 @@ import java.util.Properties;
32 33 @Component
33 34 public class TbKafkaSettings {
34 35
35   - static final String REQUEST_ID_HEADER = "requestId";
36   - static final String RESPONSE_TOPIC_HEADER = "responseTopic";
37   -
38 36 @Value("${queue.kafka.bootstrap.servers}")
39 37 private String servers;
40 38
... ... @@ -53,6 +51,10 @@ public class TbKafkaSettings {
53 51 @Value("${queue.kafka.buffer.memory}")
54 52 private long bufferMemory;
55 53
  54 + @Value("${queue.kafka.replication_factor}")
  55 + @Getter
  56 + private short replicationFactor;
  57 +
56 58 @Value("${kafka.other:#{null}}")
57 59 private List<TbKafkaProperty> other;
58 60
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.kafka;
  17 +
  18 +import lombok.Getter;
  19 +import org.springframework.beans.factory.annotation.Value;
  20 +import org.springframework.stereotype.Component;
  21 +
  22 +import javax.annotation.PostConstruct;
  23 +import java.util.HashMap;
  24 +import java.util.Map;
  25 +
  26 +@Component
  27 +public class TbKafkaTopicConfigs {
  28 + @Value("${queue.kafka.topic-properties.core}")
  29 + private String coreProperties;
  30 + @Value("${queue.kafka.topic-properties.rule-engine}")
  31 + private String ruleEngineProperties;
  32 + @Value("${queue.kafka.topic-properties.transport-api}")
  33 + private String transportApiProperties;
  34 + @Value("${queue.kafka.topic-properties.notifications}")
  35 + private String notificationsProperties;
  36 + @Value("${queue.kafka.topic-properties.js-executor}")
  37 + private String jsExecutorProperties;
  38 +
  39 + @Getter
  40 + private Map<String, String> coreConfigs;
  41 + @Getter
  42 + private Map<String, String> ruleEngineConfigs;
  43 + @Getter
  44 + private Map<String, String> transportApiConfigs;
  45 + @Getter
  46 + private Map<String, String> notificationsConfigs;
  47 + @Getter
  48 + private Map<String, String> jsExecutorConfigs;
  49 +
  50 + @PostConstruct
  51 + private void init() {
  52 + coreConfigs = getConfigs(coreProperties);
  53 + ruleEngineConfigs = getConfigs(ruleEngineProperties);
  54 + transportApiConfigs = getConfigs(transportApiProperties);
  55 + notificationsConfigs = getConfigs(notificationsProperties);
  56 + jsExecutorConfigs = getConfigs(jsExecutorProperties);
  57 + }
  58 +
  59 + private Map<String, String> getConfigs(String properties) {
  60 + Map<String, String> configs = new HashMap<>();
  61 + for (String property : properties.split(";")) {
  62 + int delimiterPosition = property.indexOf(":");
  63 + String key = property.substring(0, delimiterPosition);
  64 + String value = property.substring(delimiterPosition + 1);
  65 + configs.put(key, value);
  66 + }
  67 + return configs;
  68 + }
  69 +}
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotifica
28 28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
29 29 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
30 30 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  31 +import org.thingsboard.server.queue.TbQueueAdmin;
31 32 import org.thingsboard.server.queue.TbQueueConsumer;
32 33 import org.thingsboard.server.queue.TbQueueProducer;
33 34 import org.thingsboard.server.queue.TbQueueRequestTemplate;
... ... @@ -40,6 +41,7 @@ import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
40 41 import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
41 42 import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
  44 +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
43 45 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
44 46 import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
45 47 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
... ... @@ -62,13 +64,20 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
62 64 private final TbQueueTransportNotificationSettings transportNotificationSettings;
63 65 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
64 66
  67 + private final TbQueueAdmin coreAdmin;
  68 + private final TbQueueAdmin ruleEngineAdmin;
  69 + private final TbQueueAdmin jsExecutorAdmin;
  70 + private final TbQueueAdmin transportApiAdmin;
  71 + private final TbQueueAdmin notificationAdmin;
  72 +
65 73 public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
66 74 TbServiceInfoProvider serviceInfoProvider,
67 75 TbQueueCoreSettings coreSettings,
68 76 TbQueueRuleEngineSettings ruleEngineSettings,
69 77 TbQueueTransportApiSettings transportApiSettings,
70 78 TbQueueTransportNotificationSettings transportNotificationSettings,
71   - TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
  79 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  80 + TbKafkaTopicConfigs kafkaTopicConfigs) {
72 81 this.partitionService = partitionService;
73 82 this.kafkaSettings = kafkaSettings;
74 83 this.serviceInfoProvider = serviceInfoProvider;
... ... @@ -77,6 +86,12 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
77 86 this.transportApiSettings = transportApiSettings;
78 87 this.transportNotificationSettings = transportNotificationSettings;
79 88 this.jsInvokeSettings = jsInvokeSettings;
  89 +
  90 + this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
  91 + this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
  92 + this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
  93 + this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
  94 + this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
80 95 }
81 96
82 97 @Override
... ... @@ -85,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
85 100 requestBuilder.settings(kafkaSettings);
86 101 requestBuilder.clientId("monolith-transport-notifications-" + serviceInfoProvider.getServiceId());
87 102 requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
  103 + requestBuilder.admin(notificationAdmin);
88 104 return requestBuilder.build();
89 105 }
90 106
... ... @@ -94,6 +110,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
94 110 requestBuilder.settings(kafkaSettings);
95 111 requestBuilder.clientId("monolith-rule-engine-" + serviceInfoProvider.getServiceId());
96 112 requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
  113 + requestBuilder.admin(ruleEngineAdmin);
97 114 return requestBuilder.build();
98 115 }
99 116
... ... @@ -103,6 +120,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
103 120 requestBuilder.settings(kafkaSettings);
104 121 requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
105 122 requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
  123 + requestBuilder.admin(ruleEngineAdmin);
106 124 return requestBuilder.build();
107 125 }
108 126
... ... @@ -112,6 +130,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
112 130 requestBuilder.settings(kafkaSettings);
113 131 requestBuilder.clientId("monolith-core-" + serviceInfoProvider.getServiceId());
114 132 requestBuilder.defaultTopic(coreSettings.getTopic());
  133 + requestBuilder.admin(coreAdmin);
115 134 return requestBuilder.build();
116 135 }
117 136
... ... @@ -121,6 +140,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
121 140 requestBuilder.settings(kafkaSettings);
122 141 requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId());
123 142 requestBuilder.defaultTopic(coreSettings.getTopic());
  143 + requestBuilder.admin(coreAdmin);
124 144 return requestBuilder.build();
125 145 }
126 146
... ... @@ -133,6 +153,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
133 153 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
134 154 consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
135 155 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  156 + consumerBuilder.admin(ruleEngineAdmin);
136 157 return consumerBuilder.build();
137 158 }
138 159
... ... @@ -144,6 +165,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
144 165 consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
145 166 consumerBuilder.groupId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
146 167 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  168 + consumerBuilder.admin(notificationAdmin);
147 169 return consumerBuilder.build();
148 170 }
149 171
... ... @@ -155,6 +177,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
155 177 consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
156 178 consumerBuilder.groupId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
157 179 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  180 + consumerBuilder.admin(coreAdmin);
158 181 return consumerBuilder.build();
159 182 }
160 183
... ... @@ -166,6 +189,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
166 189 consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
167 190 consumerBuilder.groupId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
168 191 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  192 + consumerBuilder.admin(notificationAdmin);
169 193 return consumerBuilder.build();
170 194 }
171 195
... ... @@ -177,6 +201,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
177 201 consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
178 202 consumerBuilder.groupId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId());
179 203 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  204 + consumerBuilder.admin(transportApiAdmin);
180 205 return consumerBuilder.build();
181 206 }
182 207
... ... @@ -186,6 +211,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
186 211 requestBuilder.settings(kafkaSettings);
187 212 requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId());
188 213 requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
  214 + requestBuilder.admin(transportApiAdmin);
189 215 return requestBuilder.build();
190 216 }
191 217
... ... @@ -196,24 +222,24 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
196 222 requestBuilder.settings(kafkaSettings);
197 223 requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
198 224 requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
  225 + requestBuilder.admin(jsExecutorAdmin);
199 226
200 227 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
201 228 responseBuilder.settings(kafkaSettings);
202 229 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
203 230 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
204 231 responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
205   -// responseBuilder.autoCommit(true);
206   -// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
207 232 responseBuilder.decoder(msg -> {
208 233 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
209 234 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
210 235 return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
211 236 }
212 237 );
  238 + responseBuilder.admin(jsExecutorAdmin);
213 239
214 240 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
215 241 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
216   - builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
  242 + builder.queueAdmin(jsExecutorAdmin);
217 243 builder.requestTemplate(requestBuilder.build());
218 244 builder.responseTemplate(responseBuilder.build());
219 245 builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
... ...
... ... @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotifica
28 28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
29 29 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
30 30 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  31 +import org.thingsboard.server.queue.TbQueueAdmin;
31 32 import org.thingsboard.server.queue.TbQueueConsumer;
32 33 import org.thingsboard.server.queue.TbQueueProducer;
33 34 import org.thingsboard.server.queue.TbQueueRequestTemplate;
... ... @@ -40,6 +41,7 @@ import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
40 41 import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
41 42 import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
42 43 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
  44 +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
43 45 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
44 46 import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
45 47 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
... ... @@ -59,12 +61,19 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
59 61 private final TbQueueTransportApiSettings transportApiSettings;
60 62 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
61 63
  64 + private final TbQueueAdmin coreAdmin;
  65 + private final TbQueueAdmin ruleEngineAdmin;
  66 + private final TbQueueAdmin jsExecutorAdmin;
  67 + private final TbQueueAdmin transportApiAdmin;
  68 + private final TbQueueAdmin notificationAdmin;
  69 +
62 70 public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
63 71 TbServiceInfoProvider serviceInfoProvider,
64 72 TbQueueCoreSettings coreSettings,
65 73 TbQueueRuleEngineSettings ruleEngineSettings,
66 74 TbQueueTransportApiSettings transportApiSettings,
67   - TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
  75 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  76 + TbKafkaTopicConfigs kafkaTopicConfigs) {
68 77 this.partitionService = partitionService;
69 78 this.kafkaSettings = kafkaSettings;
70 79 this.serviceInfoProvider = serviceInfoProvider;
... ... @@ -72,6 +81,12 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
72 81 this.ruleEngineSettings = ruleEngineSettings;
73 82 this.transportApiSettings = transportApiSettings;
74 83 this.jsInvokeSettings = jsInvokeSettings;
  84 +
  85 + this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
  86 + this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
  87 + this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
  88 + this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
  89 + this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
75 90 }
76 91
77 92 @Override
... ... @@ -80,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
80 95 requestBuilder.settings(kafkaSettings);
81 96 requestBuilder.clientId("tb-core-transport-notifications-" + serviceInfoProvider.getServiceId());
82 97 requestBuilder.defaultTopic(coreSettings.getTopic());
  98 + requestBuilder.admin(coreAdmin);
83 99 return requestBuilder.build();
84 100 }
85 101
... ... @@ -89,6 +105,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
89 105 requestBuilder.settings(kafkaSettings);
90 106 requestBuilder.clientId("tb-core-rule-engine-" + serviceInfoProvider.getServiceId());
91 107 requestBuilder.defaultTopic(coreSettings.getTopic());
  108 + requestBuilder.admin(coreAdmin);
92 109 return requestBuilder.build();
93 110 }
94 111
... ... @@ -98,6 +115,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
98 115 requestBuilder.settings(kafkaSettings);
99 116 requestBuilder.clientId("tb-core-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
100 117 requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
  118 + requestBuilder.admin(ruleEngineAdmin);
101 119 return requestBuilder.build();
102 120 }
103 121
... ... @@ -107,6 +125,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
107 125 requestBuilder.settings(kafkaSettings);
108 126 requestBuilder.clientId("tb-core-to-core-" + serviceInfoProvider.getServiceId());
109 127 requestBuilder.defaultTopic(coreSettings.getTopic());
  128 + requestBuilder.admin(coreAdmin);
110 129 return requestBuilder.build();
111 130 }
112 131
... ... @@ -116,6 +135,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
116 135 requestBuilder.settings(kafkaSettings);
117 136 requestBuilder.clientId("tb-core-to-core-notifications-" + serviceInfoProvider.getServiceId());
118 137 requestBuilder.defaultTopic(coreSettings.getTopic());
  138 + requestBuilder.admin(coreAdmin);
119 139 return requestBuilder.build();
120 140 }
121 141
... ... @@ -127,6 +147,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
127 147 consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
128 148 consumerBuilder.groupId("tb-core-node-" + serviceInfoProvider.getServiceId());
129 149 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  150 + consumerBuilder.admin(coreAdmin);
130 151 return consumerBuilder.build();
131 152 }
132 153
... ... @@ -138,6 +159,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
138 159 consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId());
139 160 consumerBuilder.groupId("tb-core-notifications-node-" + serviceInfoProvider.getServiceId());
140 161 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  162 + consumerBuilder.admin(notificationAdmin);
141 163 return consumerBuilder.build();
142 164 }
143 165
... ... @@ -149,6 +171,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
149 171 consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
150 172 consumerBuilder.groupId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId());
151 173 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  174 + consumerBuilder.admin(transportApiAdmin);
152 175 return consumerBuilder.build();
153 176 }
154 177
... ... @@ -158,6 +181,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
158 181 requestBuilder.settings(kafkaSettings);
159 182 requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
160 183 requestBuilder.defaultTopic(coreSettings.getTopic());
  184 + requestBuilder.admin(coreAdmin);
161 185 return requestBuilder.build();
162 186 }
163 187
... ... @@ -168,24 +192,24 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
168 192 requestBuilder.settings(kafkaSettings);
169 193 requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
170 194 requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
  195 + requestBuilder.admin(jsExecutorAdmin);
171 196
172 197 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
173 198 responseBuilder.settings(kafkaSettings);
174 199 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
175 200 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
176 201 responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
177   -// responseBuilder.autoCommit(true);
178   -// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
179 202 responseBuilder.decoder(msg -> {
180 203 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
181 204 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
182 205 return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
183 206 }
184 207 );
  208 + responseBuilder.admin(jsExecutorAdmin);
185 209
186 210 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
187 211 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
188   - builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
  212 + builder.queueAdmin(jsExecutorAdmin);
189 213 builder.requestTemplate(requestBuilder.build());
190 214 builder.responseTemplate(responseBuilder.build());
191 215 builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
26 26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  29 +import org.thingsboard.server.queue.TbQueueAdmin;
29 30 import org.thingsboard.server.queue.TbQueueConsumer;
30 31 import org.thingsboard.server.queue.TbQueueProducer;
31 32 import org.thingsboard.server.queue.TbQueueRequestTemplate;
... ... @@ -38,6 +39,7 @@ import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
38 39 import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
39 40 import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
40 41 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
  42 +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
41 43 import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
42 44 import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
43 45 import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
... ... @@ -56,17 +58,28 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
56 58 private final TbQueueRuleEngineSettings ruleEngineSettings;
57 59 private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
58 60
  61 + private final TbQueueAdmin coreAdmin;
  62 + private final TbQueueAdmin ruleEngineAdmin;
  63 + private final TbQueueAdmin jsExecutorAdmin;
  64 + private final TbQueueAdmin notificationAdmin;
  65 +
59 66 public KafkaTbRuleEngineQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
60 67 TbServiceInfoProvider serviceInfoProvider,
61 68 TbQueueCoreSettings coreSettings,
62 69 TbQueueRuleEngineSettings ruleEngineSettings,
63   - TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
  70 + TbQueueRemoteJsInvokeSettings jsInvokeSettings,
  71 + TbKafkaTopicConfigs kafkaTopicConfigs) {
64 72 this.partitionService = partitionService;
65 73 this.kafkaSettings = kafkaSettings;
66 74 this.serviceInfoProvider = serviceInfoProvider;
67 75 this.coreSettings = coreSettings;
68 76 this.ruleEngineSettings = ruleEngineSettings;
69 77 this.jsInvokeSettings = jsInvokeSettings;
  78 +
  79 + this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
  80 + this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
  81 + this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
  82 + this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
70 83 }
71 84
72 85 @Override
... ... @@ -75,6 +88,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
75 88 requestBuilder.settings(kafkaSettings);
76 89 requestBuilder.clientId("tb-rule-engine-transport-notifications-" + serviceInfoProvider.getServiceId());
77 90 requestBuilder.defaultTopic(coreSettings.getTopic());
  91 + requestBuilder.admin(coreAdmin);
78 92 return requestBuilder.build();
79 93 }
80 94
... ... @@ -84,6 +98,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
84 98 requestBuilder.settings(kafkaSettings);
85 99 requestBuilder.clientId("tb-rule-engine-to-rule-engine-" + serviceInfoProvider.getServiceId());
86 100 requestBuilder.defaultTopic(coreSettings.getTopic());
  101 + requestBuilder.admin(coreAdmin);
87 102 return requestBuilder.build();
88 103 }
89 104
... ... @@ -93,6 +108,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
93 108 requestBuilder.settings(kafkaSettings);
94 109 requestBuilder.clientId("tb-rule-engine-to-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
95 110 requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
  111 + requestBuilder.admin(ruleEngineAdmin);
96 112 return requestBuilder.build();
97 113 }
98 114
... ... @@ -103,6 +119,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
103 119 requestBuilder.settings(kafkaSettings);
104 120 requestBuilder.clientId("tb-rule-engine-to-core-" + serviceInfoProvider.getServiceId());
105 121 requestBuilder.defaultTopic(coreSettings.getTopic());
  122 + requestBuilder.admin(coreAdmin);
106 123 return requestBuilder.build();
107 124 }
108 125
... ... @@ -112,6 +129,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
112 129 requestBuilder.settings(kafkaSettings);
113 130 requestBuilder.clientId("tb-rule-engine-to-core-notifications-" + serviceInfoProvider.getServiceId());
114 131 requestBuilder.defaultTopic(coreSettings.getTopic());
  132 + requestBuilder.admin(coreAdmin);
115 133 return requestBuilder.build();
116 134 }
117 135
... ... @@ -124,6 +142,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
124 142 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
125 143 consumerBuilder.groupId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
126 144 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  145 + consumerBuilder.admin(ruleEngineAdmin);
127 146 return consumerBuilder.build();
128 147 }
129 148
... ... @@ -135,6 +154,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
135 154 consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId());
136 155 consumerBuilder.groupId("tb-rule-engine-notifications-node-" + serviceInfoProvider.getServiceId());
137 156 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  157 + consumerBuilder.admin(notificationAdmin);
138 158 return consumerBuilder.build();
139 159 }
140 160
... ... @@ -145,24 +165,24 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
145 165 requestBuilder.settings(kafkaSettings);
146 166 requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId());
147 167 requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic());
  168 + requestBuilder.admin(jsExecutorAdmin);
148 169
149 170 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder();
150 171 responseBuilder.settings(kafkaSettings);
151 172 responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId());
152 173 responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId());
153 174 responseBuilder.groupId("rule-engine-node-" + serviceInfoProvider.getServiceId());
154   -// responseBuilder.autoCommit(true);
155   -// responseBuilder.autoCommitIntervalMs(autoCommitInterval);
156 175 responseBuilder.decoder(msg -> {
157 176 JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
158 177 JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), builder);
159 178 return new TbProtoQueueMsg<>(msg.getKey(), builder.build(), msg.getHeaders());
160 179 }
161 180 );
  181 + responseBuilder.admin(jsExecutorAdmin);
162 182
163 183 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
164 184 <TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> builder = DefaultTbQueueRequestTemplate.builder();
165   - builder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
  185 + builder.queueAdmin(jsExecutorAdmin);
166 186 builder.requestTemplate(requestBuilder.build());
167 187 builder.responseTemplate(responseBuilder.build());
168 188 builder.maxPendingRequests(jsInvokeSettings.getMaxPendingRequests());
... ...
... ... @@ -18,25 +18,27 @@ package org.thingsboard.server.queue.provider;
18 18 import lombok.extern.slf4j.Slf4j;
19 19 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
20 20 import org.springframework.stereotype.Component;
21   -import org.thingsboard.server.queue.TbQueueConsumer;
22   -import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
23   -import org.thingsboard.server.queue.TbQueueProducer;
24   -import org.thingsboard.server.queue.TbQueueRequestTemplate;
25   -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
26   -import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
27   -import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
28   -import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
29   -import org.thingsboard.server.queue.common.TbProtoQueueMsg;
30 21 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
31 22 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
32 23 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
33 24 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
34 25 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  26 +import org.thingsboard.server.queue.TbQueueAdmin;
  27 +import org.thingsboard.server.queue.TbQueueConsumer;
  28 +import org.thingsboard.server.queue.TbQueueProducer;
  29 +import org.thingsboard.server.queue.TbQueueRequestTemplate;
  30 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
  31 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
35 32 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
36 33 import org.thingsboard.server.queue.kafka.TBKafkaAdmin;
37 34 import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate;
38 35 import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate;
39 36 import org.thingsboard.server.queue.kafka.TbKafkaSettings;
  37 +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
  38 +import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  39 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  40 +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
  41 +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
40 42
41 43 @Component
42 44 @ConditionalOnExpression("'${queue.type:null}'=='kafka' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
... ... @@ -50,18 +52,29 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
50 52 private final TbQueueTransportApiSettings transportApiSettings;
51 53 private final TbQueueTransportNotificationSettings transportNotificationSettings;
52 54
  55 + private final TbQueueAdmin coreAdmin;
  56 + private final TbQueueAdmin ruleEngineAdmin;
  57 + private final TbQueueAdmin transportApiAdmin;
  58 + private final TbQueueAdmin notificationAdmin;
  59 +
53 60 public KafkaTbTransportQueueFactory(TbKafkaSettings kafkaSettings,
54 61 TbServiceInfoProvider serviceInfoProvider,
55 62 TbQueueCoreSettings coreSettings,
56 63 TbQueueRuleEngineSettings ruleEngineSettings,
57 64 TbQueueTransportApiSettings transportApiSettings,
58   - TbQueueTransportNotificationSettings transportNotificationSettings) {
  65 + TbQueueTransportNotificationSettings transportNotificationSettings,
  66 + TbKafkaTopicConfigs kafkaTopicConfigs) {
59 67 this.kafkaSettings = kafkaSettings;
60 68 this.serviceInfoProvider = serviceInfoProvider;
61 69 this.coreSettings = coreSettings;
62 70 this.ruleEngineSettings = ruleEngineSettings;
63 71 this.transportApiSettings = transportApiSettings;
64 72 this.transportNotificationSettings = transportNotificationSettings;
  73 +
  74 + this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
  75 + this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
  76 + this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
  77 + this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
65 78 }
66 79
67 80 @Override
... ... @@ -70,6 +83,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
70 83 requestBuilder.settings(kafkaSettings);
71 84 requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId());
72 85 requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic());
  86 + requestBuilder.admin(transportApiAdmin);
73 87
74 88 TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TBKafkaConsumerTemplate.builder();
75 89 responseBuilder.settings(kafkaSettings);
... ... @@ -77,10 +91,11 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
77 91 responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId());
78 92 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
79 93 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
  94 + responseBuilder.admin(transportApiAdmin);
80 95
81 96 DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
82 97 <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
83   - templateBuilder.queueAdmin(new TBKafkaAdmin(kafkaSettings));
  98 + templateBuilder.queueAdmin(transportApiAdmin);
84 99 templateBuilder.requestTemplate(requestBuilder.build());
85 100 templateBuilder.responseTemplate(responseBuilder.build());
86 101 templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
... ... @@ -93,8 +108,9 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
93 108 public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
94 109 TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder();
95 110 requestBuilder.settings(kafkaSettings);
96   - requestBuilder.clientId("transport-node-rule-engine-"+ serviceInfoProvider.getServiceId());
  111 + requestBuilder.clientId("transport-node-rule-engine-" + serviceInfoProvider.getServiceId());
97 112 requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
  113 + requestBuilder.admin(ruleEngineAdmin);
98 114 return requestBuilder.build();
99 115 }
100 116
... ... @@ -104,6 +120,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
104 120 requestBuilder.settings(kafkaSettings);
105 121 requestBuilder.clientId("transport-node-core-" + serviceInfoProvider.getServiceId());
106 122 requestBuilder.defaultTopic(coreSettings.getTopic());
  123 + requestBuilder.admin(coreAdmin);
107 124 return requestBuilder.build();
108 125 }
109 126
... ... @@ -115,6 +132,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory {
115 132 responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId());
116 133 responseBuilder.groupId("transport-node-" + serviceInfoProvider.getServiceId());
117 134 responseBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
  135 + responseBuilder.admin(notificationAdmin);
118 136 return responseBuilder.build();
119 137 }
120 138 }
... ...
... ... @@ -50,6 +50,13 @@ queue:
50 50 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
51 51 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
52 52 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  53 + replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  54 + topic-properties:
  55 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  56 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  57 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  58 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  59 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
53 60 aws_sqs:
54 61 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
55 62 secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
... ...
... ... @@ -51,6 +51,13 @@ queue:
51 51 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
52 52 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
53 53 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  54 + replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  55 + topic-properties:
  56 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  57 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  58 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  59 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  60 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
54 61 aws_sqs:
55 62 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
56 63 secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
... ...
... ... @@ -81,6 +81,13 @@ queue:
81 81 batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
82 82 linger.ms: "${TB_KAFKA_LINGER_MS:1}"
83 83 buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
  84 + replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
  85 + topic-properties:
  86 + rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  87 + core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  88 + transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  89 + notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
  90 + js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
84 91 aws_sqs:
85 92 access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
86 93 secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
... ...