Commit 9fa82078a02d56ce1f3835bc6788b97ed52c54c8
Committed by
Andrew Shvayka
1 parent
0a136588
created TbAwsSqsQueueAttributes and added queue properties to yml
Showing
26 changed files
with
334 additions
and
214 deletions
... | ... | @@ -533,7 +533,13 @@ queue: |
533 | 533 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
534 | 534 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
535 | 535 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" |
536 | - visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again | |
536 | + queue-properties: | |
537 | + rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
538 | + core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
539 | + transport-api: "${TB_QUEUE_AWS_SQS_TA_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
540 | + notifications: "${TB_QUEUE_AWS_SQS_NOTIFICATIONS_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
541 | + js-executor: "${TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
542 | + # VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds | |
537 | 543 | pubsub: |
538 | 544 | project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" |
539 | 545 | service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ... | ... |
... | ... | @@ -23,7 +23,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
23 | 23 | import org.springframework.stereotype.Component; |
24 | 24 | import org.thingsboard.server.queue.TbQueueAdmin; |
25 | 25 | |
26 | -import javax.annotation.PreDestroy; | |
27 | 26 | import java.io.IOException; |
28 | 27 | import java.util.Set; |
29 | 28 | import java.util.concurrent.ConcurrentHashMap; |
... | ... | @@ -68,8 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { |
68 | 67 | } |
69 | 68 | } |
70 | 69 | |
71 | - @PreDestroy | |
72 | - private void destroy() { | |
70 | + public void destroy() { | |
73 | 71 | try { |
74 | 72 | client.close(); |
75 | 73 | } catch (IOException e) { | ... | ... |
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java
renamed from
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaAdmin.java
... | ... | @@ -32,7 +32,7 @@ import java.util.concurrent.ExecutionException; |
32 | 32 | * Created by ashvayka on 24.09.18. |
33 | 33 | */ |
34 | 34 | @Slf4j |
35 | -public class TBKafkaAdmin implements TbQueueAdmin { | |
35 | +public class TbKafkaAdmin implements TbQueueAdmin { | |
36 | 36 | |
37 | 37 | private final AdminClient client; |
38 | 38 | private final Map<String, String> topicConfigs; |
... | ... | @@ -40,7 +40,7 @@ public class TBKafkaAdmin implements TbQueueAdmin { |
40 | 40 | |
41 | 41 | private final short replicationFactor; |
42 | 42 | |
43 | - public TBKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) { | |
43 | + public TbKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) { | |
44 | 44 | client = AdminClient.create(settings.toProps()); |
45 | 45 | this.topicConfigs = topicConfigs; |
46 | 46 | |
... | ... | @@ -76,6 +76,13 @@ public class TBKafkaAdmin implements TbQueueAdmin { |
76 | 76 | |
77 | 77 | } |
78 | 78 | |
79 | + @Override | |
80 | + public void destroy() { | |
81 | + if (client != null) { | |
82 | + client.close(); | |
83 | + } | |
84 | + } | |
85 | + | |
79 | 86 | public CreateTopicsResult createTopic(NewTopic topic) { |
80 | 87 | return client.createTopics(Collections.singletonList(topic)); |
81 | 88 | } | ... | ... |
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java
renamed from
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaConsumerTemplate.java
... | ... | @@ -42,7 +42,7 @@ import java.util.stream.Collectors; |
42 | 42 | * Created by ashvayka on 24.09.18. |
43 | 43 | */ |
44 | 44 | @Slf4j |
45 | -public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { | |
45 | +public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { | |
46 | 46 | |
47 | 47 | private final TbQueueAdmin admin; |
48 | 48 | private final KafkaConsumer<String, byte[]> consumer; |
... | ... | @@ -55,7 +55,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon |
55 | 55 | private final String topic; |
56 | 56 | |
57 | 57 | @Builder |
58 | - private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, | |
58 | + private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, | |
59 | 59 | String clientId, String groupId, String topic, |
60 | 60 | boolean autoCommit, int autoCommitIntervalMs, |
61 | 61 | int maxPollRecords, | ... | ... |
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaHandler.java
deleted
100644 → 0
1 | -/** | |
2 | - * Copyright © 2016-2020 The Thingsboard Authors | |
3 | - * | |
4 | - * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | - * you may not use this file except in compliance with the License. | |
6 | - * You may obtain a copy of the License at | |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
10 | - * Unless required by applicable law or agreed to in writing, software | |
11 | - * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | - * See the License for the specific language governing permissions and | |
14 | - * limitations under the License. | |
15 | - */ | |
16 | -package org.thingsboard.server.queue.kafka; | |
17 | - | |
18 | -import com.google.common.util.concurrent.ListenableFuture; | |
19 | - | |
20 | -/** | |
21 | - * Created by ashvayka on 05.10.18. | |
22 | - */ | |
23 | -public interface TbKafkaHandler<Request, Response> { | |
24 | - | |
25 | - ListenableFuture<Response> handle(Request request); | |
26 | - | |
27 | -} |
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaPartitioner.java
deleted
100644 → 0
1 | -/** | |
2 | - * Copyright © 2016-2020 The Thingsboard Authors | |
3 | - * | |
4 | - * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | - * you may not use this file except in compliance with the License. | |
6 | - * You may obtain a copy of the License at | |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
10 | - * Unless required by applicable law or agreed to in writing, software | |
11 | - * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | - * See the License for the specific language governing permissions and | |
14 | - * limitations under the License. | |
15 | - */ | |
16 | -package org.thingsboard.server.queue.kafka; | |
17 | - | |
18 | -import org.apache.kafka.clients.producer.Partitioner; | |
19 | -import org.apache.kafka.common.PartitionInfo; | |
20 | - | |
21 | -import java.util.List; | |
22 | - | |
23 | -/** | |
24 | - * Created by ashvayka on 25.09.18. | |
25 | - */ | |
26 | -public interface TbKafkaPartitioner<T> extends Partitioner { | |
27 | - | |
28 | - int partition(String topic, String key, T value, byte[] encodedValue, List<PartitionInfo> partitions); | |
29 | - | |
30 | -} |
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaProducerTemplate.java
renamed from
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TBKafkaProducerTemplate.java
... | ... | @@ -39,7 +39,7 @@ import java.util.stream.Collectors; |
39 | 39 | * Created by ashvayka on 24.09.18. |
40 | 40 | */ |
41 | 41 | @Slf4j |
42 | -public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { | |
42 | +public class TbKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { | |
43 | 43 | |
44 | 44 | private final KafkaProducer<String, byte[]> producer; |
45 | 45 | |
... | ... | @@ -54,7 +54,7 @@ public class TBKafkaProducerTemplate<T extends TbQueueMsg> implements TbQueuePro |
54 | 54 | private final Set<TopicPartitionInfo> topics; |
55 | 55 | |
56 | 56 | @Builder |
57 | - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaPartitioner<T> partitioner, String defaultTopic, String clientId, TbQueueAdmin admin) { | |
57 | + private TbKafkaProducerTemplate(TbKafkaSettings settings, String defaultTopic, String clientId, TbQueueAdmin admin) { | |
58 | 58 | Properties props = settings.toProps(); |
59 | 59 | props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); |
60 | 60 | props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.kafka; |
17 | 17 | |
18 | 18 | import lombok.Getter; |
19 | 19 | import org.springframework.beans.factory.annotation.Value; |
20 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | |
20 | 21 | import org.springframework.stereotype.Component; |
21 | 22 | |
22 | 23 | import javax.annotation.PostConstruct; |
... | ... | @@ -24,6 +25,7 @@ import java.util.HashMap; |
24 | 25 | import java.util.Map; |
25 | 26 | |
26 | 27 | @Component |
28 | +@ConditionalOnExpression("'${queue.type:null}'=='kafka'") | |
27 | 29 | public class TbKafkaTopicConfigs { |
28 | 30 | @Value("${queue.kafka.topic-properties.core}") |
29 | 31 | private String coreProperties; | ... | ... |
... | ... | @@ -36,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
36 | 36 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
37 | 37 | import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; |
38 | 38 | import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; |
39 | +import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | |
39 | 40 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
40 | 41 | |
41 | 42 | @Component |
... | ... | @@ -49,14 +50,20 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng |
49 | 50 | private final TbQueueTransportApiSettings transportApiSettings; |
50 | 51 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
51 | 52 | private final TbAwsSqsSettings sqsSettings; |
52 | - private final TbQueueAdmin admin; | |
53 | + | |
54 | + private final TbQueueAdmin coreAdmin; | |
55 | + private final TbQueueAdmin ruleEngineAdmin; | |
56 | + private final TbQueueAdmin jsExecutorAdmin; | |
57 | + private final TbQueueAdmin transportApiAdmin; | |
58 | + private final TbQueueAdmin notificationAdmin; | |
53 | 59 | |
54 | 60 | public AwsSqsMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
55 | 61 | TbQueueRuleEngineSettings ruleEngineSettings, |
56 | 62 | TbServiceInfoProvider serviceInfoProvider, |
57 | 63 | TbQueueTransportApiSettings transportApiSettings, |
58 | 64 | TbQueueTransportNotificationSettings transportNotificationSettings, |
59 | - TbAwsSqsSettings sqsSettings) { | |
65 | + TbAwsSqsSettings sqsSettings, | |
66 | + TbAwsSqsQueueAttributes sqsQueueAttributes) { | |
60 | 67 | this.partitionService = partitionService; |
61 | 68 | this.coreSettings = coreSettings; |
62 | 69 | this.serviceInfoProvider = serviceInfoProvider; |
... | ... | @@ -64,69 +71,74 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng |
64 | 71 | this.transportApiSettings = transportApiSettings; |
65 | 72 | this.transportNotificationSettings = transportNotificationSettings; |
66 | 73 | this.sqsSettings = sqsSettings; |
67 | - admin = new TbAwsSqsAdmin(sqsSettings); | |
74 | + | |
75 | + this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | |
76 | + this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); | |
77 | + this.jsExecutorAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getJsExecutorAttributes()); | |
78 | + this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); | |
79 | + this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); | |
68 | 80 | } |
69 | 81 | |
70 | 82 | @Override |
71 | 83 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() { |
72 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic()); | |
84 | + return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic()); | |
73 | 85 | } |
74 | 86 | |
75 | 87 | @Override |
76 | 88 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
77 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); | |
89 | + return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic()); | |
78 | 90 | } |
79 | 91 | |
80 | 92 | @Override |
81 | 93 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
82 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); | |
94 | + return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic()); | |
83 | 95 | } |
84 | 96 | |
85 | 97 | @Override |
86 | 98 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { |
87 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
99 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
88 | 100 | } |
89 | 101 | |
90 | 102 | @Override |
91 | 103 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
92 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
104 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
93 | 105 | } |
94 | 106 | |
95 | 107 | @Override |
96 | 108 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
97 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), | |
109 | + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(), | |
98 | 110 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
99 | 111 | } |
100 | 112 | |
101 | 113 | @Override |
102 | 114 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
103 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, | |
115 | + return new TbAwsSqsConsumerTemplate<>(notificationAdmin, sqsSettings, | |
104 | 116 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
105 | 117 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
106 | 118 | } |
107 | 119 | |
108 | 120 | @Override |
109 | 121 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() { |
110 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(), | |
122 | + return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic(), | |
111 | 123 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
112 | 124 | } |
113 | 125 | |
114 | 126 | @Override |
115 | 127 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
116 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, | |
128 | + return new TbAwsSqsConsumerTemplate<>(notificationAdmin, sqsSettings, | |
117 | 129 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
118 | 130 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
119 | 131 | } |
120 | 132 | |
121 | 133 | @Override |
122 | 134 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
123 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), | |
135 | + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic(), | |
124 | 136 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
125 | 137 | } |
126 | 138 | |
127 | 139 | @Override |
128 | 140 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() { |
129 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getResponsesTopic()); | |
141 | + return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getResponsesTopic()); | |
130 | 142 | } |
131 | 143 | |
132 | 144 | @Override | ... | ... |
... | ... | @@ -40,6 +40,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
40 | 40 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
41 | 41 | import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; |
42 | 42 | import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; |
43 | +import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | |
43 | 44 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
44 | 45 | |
45 | 46 | @Component |
... | ... | @@ -52,70 +53,81 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { |
52 | 53 | private final TbQueueTransportApiSettings transportApiSettings; |
53 | 54 | private final PartitionService partitionService; |
54 | 55 | private final TbServiceInfoProvider serviceInfoProvider; |
55 | - private final TbQueueAdmin admin; | |
56 | + | |
57 | + private final TbQueueAdmin coreAdmin; | |
58 | + private final TbQueueAdmin ruleEngineAdmin; | |
59 | + private final TbQueueAdmin jsExecutorAdmin; | |
60 | + private final TbQueueAdmin transportApiAdmin; | |
61 | + private final TbQueueAdmin notificationAdmin; | |
56 | 62 | |
57 | 63 | public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings, |
58 | 64 | TbQueueCoreSettings coreSettings, |
59 | 65 | TbQueueTransportApiSettings transportApiSettings, |
60 | 66 | TbQueueRuleEngineSettings ruleEngineSettings, |
61 | 67 | PartitionService partitionService, |
62 | - TbServiceInfoProvider serviceInfoProvider) { | |
68 | + TbServiceInfoProvider serviceInfoProvider, | |
69 | + TbAwsSqsQueueAttributes sqsQueueAttributes) { | |
63 | 70 | this.sqsSettings = sqsSettings; |
64 | 71 | this.coreSettings = coreSettings; |
65 | 72 | this.transportApiSettings = transportApiSettings; |
66 | 73 | this.ruleEngineSettings = ruleEngineSettings; |
67 | 74 | this.partitionService = partitionService; |
68 | 75 | this.serviceInfoProvider = serviceInfoProvider; |
69 | - this.admin = new TbAwsSqsAdmin(sqsSettings); | |
76 | + | |
77 | + this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | |
78 | + this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); | |
79 | + this.jsExecutorAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getJsExecutorAttributes()); | |
80 | + this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); | |
81 | + this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); | |
70 | 82 | } |
71 | 83 | |
72 | 84 | @Override |
73 | 85 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
74 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
86 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
75 | 87 | } |
76 | 88 | |
77 | 89 | @Override |
78 | 90 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
79 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
91 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
80 | 92 | } |
81 | 93 | |
82 | 94 | @Override |
83 | 95 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
84 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); | |
96 | + return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic()); | |
85 | 97 | } |
86 | 98 | |
87 | 99 | @Override |
88 | 100 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
101 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
90 | 102 | } |
91 | 103 | |
92 | 104 | @Override |
93 | 105 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
94 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
106 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
95 | 107 | } |
96 | 108 | |
97 | 109 | @Override |
98 | 110 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
99 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, coreSettings.getTopic(), | |
111 | + return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic(), | |
100 | 112 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
101 | 113 | } |
102 | 114 | |
103 | 115 | @Override |
104 | 116 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
105 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, | |
117 | + return new TbAwsSqsConsumerTemplate<>(notificationAdmin, sqsSettings, | |
106 | 118 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
107 | 119 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
108 | 120 | } |
109 | 121 | |
110 | 122 | @Override |
111 | 123 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
112 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic(), | |
124 | + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic(), | |
113 | 125 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
114 | 126 | } |
115 | 127 | |
116 | 128 | @Override |
117 | 129 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
118 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
130 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
119 | 131 | } |
120 | 132 | |
121 | 133 | @Override | ... | ... |
common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbRuleEngineQueueFactory.java
... | ... | @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
37 | 37 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
38 | 38 | import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; |
39 | 39 | import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; |
40 | +import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | |
40 | 41 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
41 | 42 | |
42 | 43 | @Component |
... | ... | @@ -48,54 +49,63 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory |
48 | 49 | private final TbServiceInfoProvider serviceInfoProvider; |
49 | 50 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
50 | 51 | private final TbAwsSqsSettings sqsSettings; |
51 | - private final TbQueueAdmin admin; | |
52 | + | |
53 | + private final TbQueueAdmin coreAdmin; | |
54 | + private final TbQueueAdmin ruleEngineAdmin; | |
55 | + private final TbQueueAdmin jsExecutorAdmin; | |
56 | + private final TbQueueAdmin notificationAdmin; | |
52 | 57 | |
53 | 58 | public AwsSqsTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
54 | 59 | TbQueueRuleEngineSettings ruleEngineSettings, |
55 | 60 | TbServiceInfoProvider serviceInfoProvider, |
56 | - TbAwsSqsSettings sqsSettings) { | |
61 | + TbAwsSqsSettings sqsSettings, | |
62 | + TbAwsSqsQueueAttributes sqsQueueAttributes) { | |
57 | 63 | this.partitionService = partitionService; |
58 | 64 | this.coreSettings = coreSettings; |
59 | 65 | this.serviceInfoProvider = serviceInfoProvider; |
60 | 66 | this.ruleEngineSettings = ruleEngineSettings; |
61 | 67 | this.sqsSettings = sqsSettings; |
62 | - admin = new TbAwsSqsAdmin(sqsSettings); | |
68 | + | |
69 | + this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | |
70 | + this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes()); | |
71 | + this.jsExecutorAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getJsExecutorAttributes()); | |
72 | + this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); | |
63 | 73 | } |
64 | 74 | |
65 | 75 | @Override |
66 | 76 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
67 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
77 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
68 | 78 | } |
69 | 79 | |
70 | 80 | @Override |
71 | 81 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
72 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
82 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
73 | 83 | } |
74 | 84 | |
75 | 85 | @Override |
76 | 86 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
77 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic()); | |
87 | + return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic()); | |
78 | 88 | } |
79 | 89 | |
80 | 90 | @Override |
81 | 91 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
82 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
92 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
83 | 93 | } |
84 | 94 | |
85 | 95 | @Override |
86 | 96 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
87 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, coreSettings.getTopic()); | |
97 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); | |
88 | 98 | } |
89 | 99 | |
90 | 100 | @Override |
91 | 101 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
92 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, ruleEngineSettings.getTopic(), | |
102 | + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(), | |
93 | 103 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
94 | 104 | } |
95 | 105 | |
96 | 106 | @Override |
97 | 107 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
98 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, | |
108 | + return new TbAwsSqsConsumerTemplate<>(notificationAdmin, sqsSettings, | |
99 | 109 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
100 | 110 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
101 | 111 | } | ... | ... |
... | ... | @@ -35,6 +35,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSetting |
35 | 35 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
36 | 36 | import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate; |
37 | 37 | import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate; |
38 | +import org.thingsboard.server.queue.sqs.TbAwsSqsQueueAttributes; | |
38 | 39 | import org.thingsboard.server.queue.sqs.TbAwsSqsSettings; |
39 | 40 | |
40 | 41 | @Component |
... | ... | @@ -44,33 +45,38 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { |
44 | 45 | private final TbQueueTransportApiSettings transportApiSettings; |
45 | 46 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
46 | 47 | private final TbAwsSqsSettings sqsSettings; |
47 | - private final TbQueueAdmin admin; | |
48 | 48 | private final TbServiceInfoProvider serviceInfoProvider; |
49 | 49 | |
50 | + private final TbQueueAdmin transportApiAdmin; | |
51 | + private final TbQueueAdmin notificationAdmin; | |
52 | + | |
50 | 53 | public AwsSqsTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
51 | 54 | TbQueueTransportNotificationSettings transportNotificationSettings, |
52 | 55 | TbAwsSqsSettings sqsSettings, |
53 | - TbServiceInfoProvider serviceInfoProvider) { | |
56 | + TbServiceInfoProvider serviceInfoProvider, | |
57 | + TbAwsSqsQueueAttributes sqsQueueAttributes) { | |
54 | 58 | this.transportApiSettings = transportApiSettings; |
55 | 59 | this.transportNotificationSettings = transportNotificationSettings; |
56 | 60 | this.sqsSettings = sqsSettings; |
57 | - admin = new TbAwsSqsAdmin(sqsSettings); | |
58 | 61 | this.serviceInfoProvider = serviceInfoProvider; |
62 | + | |
63 | + this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); | |
64 | + this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); | |
59 | 65 | } |
60 | 66 | |
61 | 67 | @Override |
62 | 68 | public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() { |
63 | 69 | TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate = |
64 | - new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
70 | + new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
65 | 71 | |
66 | 72 | TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate = |
67 | - new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, | |
73 | + new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, | |
68 | 74 | transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(), |
69 | 75 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); |
70 | 76 | |
71 | 77 | DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder |
72 | 78 | <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); |
73 | - templateBuilder.queueAdmin(admin); | |
79 | + templateBuilder.queueAdmin(transportApiAdmin); | |
74 | 80 | templateBuilder.requestTemplate(producerTemplate); |
75 | 81 | templateBuilder.responseTemplate(consumerTemplate); |
76 | 82 | templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); |
... | ... | @@ -81,17 +87,17 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { |
81 | 87 | |
82 | 88 | @Override |
83 | 89 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
84 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
90 | + return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
85 | 91 | } |
86 | 92 | |
87 | 93 | @Override |
88 | 94 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new TbAwsSqsProducerTemplate<>(admin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
95 | + return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic()); | |
90 | 96 | } |
91 | 97 | |
92 | 98 | @Override |
93 | 99 | public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() { |
94 | - return new TbAwsSqsConsumerTemplate<>(admin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(), | |
100 | + return new TbAwsSqsConsumerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic() + "_" + serviceInfoProvider.getServiceId(), | |
95 | 101 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); |
96 | 102 | } |
97 | 103 | } | ... | ... |
... | ... | @@ -23,6 +23,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
23 | 23 | import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
24 | 24 | import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; |
25 | 25 | import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; |
26 | +import org.thingsboard.server.queue.TbQueueAdmin; | |
26 | 27 | import org.thingsboard.server.queue.TbQueueConsumer; |
27 | 28 | import org.thingsboard.server.queue.TbQueueProducer; |
28 | 29 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
... | ... | @@ -33,6 +34,7 @@ import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; |
33 | 34 | import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; |
34 | 35 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
35 | 36 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
37 | +import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; | |
36 | 38 | |
37 | 39 | @Component |
38 | 40 | @ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
... | ... | @@ -60,8 +62,15 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory |
60 | 62 | |
61 | 63 | DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder |
62 | 64 | <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); |
63 | - templateBuilder.queueAdmin(topic -> { | |
65 | + | |
66 | + templateBuilder.queueAdmin(new TbQueueAdmin() { | |
67 | + @Override | |
68 | + public void createTopicIfNotExists(String topic) {} | |
69 | + | |
70 | + @Override | |
71 | + public void destroy() {} | |
64 | 72 | }); |
73 | + | |
65 | 74 | templateBuilder.requestTemplate(producerTemplate); |
66 | 75 | templateBuilder.responseTemplate(consumerTemplate); |
67 | 76 | templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); | ... | ... |
... | ... | @@ -37,9 +37,9 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
37 | 37 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
38 | 38 | import org.thingsboard.server.queue.discovery.PartitionService; |
39 | 39 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
40 | -import org.thingsboard.server.queue.kafka.TBKafkaAdmin; | |
41 | -import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; | |
42 | -import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; | |
40 | +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; | |
41 | +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; | |
42 | +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; | |
43 | 43 | import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
44 | 44 | import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
45 | 45 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
... | ... | @@ -87,16 +87,16 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
87 | 87 | this.transportNotificationSettings = transportNotificationSettings; |
88 | 88 | this.jsInvokeSettings = jsInvokeSettings; |
89 | 89 | |
90 | - this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
91 | - this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
92 | - this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
93 | - this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
94 | - this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
90 | + this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
91 | + this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
92 | + this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
93 | + this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
94 | + this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
95 | 95 | } |
96 | 96 | |
97 | 97 | @Override |
98 | 98 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
99 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
99 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
100 | 100 | requestBuilder.settings(kafkaSettings); |
101 | 101 | requestBuilder.clientId("monolith-transport-notifications-" + serviceInfoProvider.getServiceId()); |
102 | 102 | requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic()); |
... | ... | @@ -106,7 +106,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
106 | 106 | |
107 | 107 | @Override |
108 | 108 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
109 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
109 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
110 | 110 | requestBuilder.settings(kafkaSettings); |
111 | 111 | requestBuilder.clientId("monolith-rule-engine-" + serviceInfoProvider.getServiceId()); |
112 | 112 | requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); |
... | ... | @@ -116,7 +116,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
116 | 116 | |
117 | 117 | @Override |
118 | 118 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
119 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
119 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
120 | 120 | requestBuilder.settings(kafkaSettings); |
121 | 121 | requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId()); |
122 | 122 | requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); |
... | ... | @@ -126,7 +126,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
126 | 126 | |
127 | 127 | @Override |
128 | 128 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
129 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
129 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
130 | 130 | requestBuilder.settings(kafkaSettings); |
131 | 131 | requestBuilder.clientId("monolith-core-" + serviceInfoProvider.getServiceId()); |
132 | 132 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -136,7 +136,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
136 | 136 | |
137 | 137 | @Override |
138 | 138 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
139 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
139 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
140 | 140 | requestBuilder.settings(kafkaSettings); |
141 | 141 | requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId()); |
142 | 142 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -147,7 +147,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
147 | 147 | @Override |
148 | 148 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
149 | 149 | String queueName = configuration.getName(); |
150 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
150 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
151 | 151 | consumerBuilder.settings(kafkaSettings); |
152 | 152 | consumerBuilder.topic(ruleEngineSettings.getTopic()); |
153 | 153 | consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -159,7 +159,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
159 | 159 | |
160 | 160 | @Override |
161 | 161 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
162 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
162 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
163 | 163 | consumerBuilder.settings(kafkaSettings); |
164 | 164 | consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); |
165 | 165 | consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -171,7 +171,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
171 | 171 | |
172 | 172 | @Override |
173 | 173 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
174 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
174 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
175 | 175 | consumerBuilder.settings(kafkaSettings); |
176 | 176 | consumerBuilder.topic(coreSettings.getTopic()); |
177 | 177 | consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -183,7 +183,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
183 | 183 | |
184 | 184 | @Override |
185 | 185 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
186 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
186 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
187 | 187 | consumerBuilder.settings(kafkaSettings); |
188 | 188 | consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); |
189 | 189 | consumerBuilder.clientId("monolith-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -195,7 +195,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
195 | 195 | |
196 | 196 | @Override |
197 | 197 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
198 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
198 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
199 | 199 | consumerBuilder.settings(kafkaSettings); |
200 | 200 | consumerBuilder.topic(transportApiSettings.getRequestsTopic()); |
201 | 201 | consumerBuilder.clientId("monolith-transport-api-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -207,7 +207,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
207 | 207 | |
208 | 208 | @Override |
209 | 209 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
210 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
210 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
211 | 211 | requestBuilder.settings(kafkaSettings); |
212 | 212 | requestBuilder.clientId("monolith-transport-api-producer-" + serviceInfoProvider.getServiceId()); |
213 | 213 | requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic()); |
... | ... | @@ -218,13 +218,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
218 | 218 | @Override |
219 | 219 | @Bean |
220 | 220 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
221 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
221 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
222 | 222 | requestBuilder.settings(kafkaSettings); |
223 | 223 | requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); |
224 | 224 | requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); |
225 | 225 | requestBuilder.admin(jsExecutorAdmin); |
226 | 226 | |
227 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder(); | |
227 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); | |
228 | 228 | responseBuilder.settings(kafkaSettings); |
229 | 229 | responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); |
230 | 230 | responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); | ... | ... |
... | ... | @@ -37,9 +37,9 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
37 | 37 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
38 | 38 | import org.thingsboard.server.queue.discovery.PartitionService; |
39 | 39 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
40 | -import org.thingsboard.server.queue.kafka.TBKafkaAdmin; | |
41 | -import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; | |
42 | -import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; | |
40 | +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; | |
41 | +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; | |
42 | +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; | |
43 | 43 | import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
44 | 44 | import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
45 | 45 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
... | ... | @@ -82,16 +82,16 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
82 | 82 | this.transportApiSettings = transportApiSettings; |
83 | 83 | this.jsInvokeSettings = jsInvokeSettings; |
84 | 84 | |
85 | - this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
86 | - this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
87 | - this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
88 | - this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
89 | - this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
85 | + this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
86 | + this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
87 | + this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
88 | + this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
89 | + this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
90 | 90 | } |
91 | 91 | |
92 | 92 | @Override |
93 | 93 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
94 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
94 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
95 | 95 | requestBuilder.settings(kafkaSettings); |
96 | 96 | requestBuilder.clientId("tb-core-transport-notifications-" + serviceInfoProvider.getServiceId()); |
97 | 97 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -101,7 +101,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
101 | 101 | |
102 | 102 | @Override |
103 | 103 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
104 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
104 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
105 | 105 | requestBuilder.settings(kafkaSettings); |
106 | 106 | requestBuilder.clientId("tb-core-rule-engine-" + serviceInfoProvider.getServiceId()); |
107 | 107 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -111,7 +111,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
111 | 111 | |
112 | 112 | @Override |
113 | 113 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
114 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
114 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
115 | 115 | requestBuilder.settings(kafkaSettings); |
116 | 116 | requestBuilder.clientId("tb-core-rule-engine-notifications-" + serviceInfoProvider.getServiceId()); |
117 | 117 | requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); |
... | ... | @@ -121,7 +121,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
121 | 121 | |
122 | 122 | @Override |
123 | 123 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
124 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
124 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
125 | 125 | requestBuilder.settings(kafkaSettings); |
126 | 126 | requestBuilder.clientId("tb-core-to-core-" + serviceInfoProvider.getServiceId()); |
127 | 127 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -131,7 +131,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
131 | 131 | |
132 | 132 | @Override |
133 | 133 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
134 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
134 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
135 | 135 | requestBuilder.settings(kafkaSettings); |
136 | 136 | requestBuilder.clientId("tb-core-to-core-notifications-" + serviceInfoProvider.getServiceId()); |
137 | 137 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -141,7 +141,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
141 | 141 | |
142 | 142 | @Override |
143 | 143 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
144 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
144 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
145 | 145 | consumerBuilder.settings(kafkaSettings); |
146 | 146 | consumerBuilder.topic(coreSettings.getTopic()); |
147 | 147 | consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -153,7 +153,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
153 | 153 | |
154 | 154 | @Override |
155 | 155 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
156 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
156 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
157 | 157 | consumerBuilder.settings(kafkaSettings); |
158 | 158 | consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName()); |
159 | 159 | consumerBuilder.clientId("tb-core-notifications-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -165,7 +165,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
165 | 165 | |
166 | 166 | @Override |
167 | 167 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
168 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
168 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
169 | 169 | consumerBuilder.settings(kafkaSettings); |
170 | 170 | consumerBuilder.topic(transportApiSettings.getRequestsTopic()); |
171 | 171 | consumerBuilder.clientId("tb-core-transport-api-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -177,7 +177,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
177 | 177 | |
178 | 178 | @Override |
179 | 179 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
180 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
180 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
181 | 181 | requestBuilder.settings(kafkaSettings); |
182 | 182 | requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId()); |
183 | 183 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -188,13 +188,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
188 | 188 | @Override |
189 | 189 | @Bean |
190 | 190 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
191 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
191 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
192 | 192 | requestBuilder.settings(kafkaSettings); |
193 | 193 | requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); |
194 | 194 | requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); |
195 | 195 | requestBuilder.admin(jsExecutorAdmin); |
196 | 196 | |
197 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder(); | |
197 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); | |
198 | 198 | responseBuilder.settings(kafkaSettings); |
199 | 199 | responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); |
200 | 200 | responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); | ... | ... |
... | ... | @@ -35,9 +35,9 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
35 | 35 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
36 | 36 | import org.thingsboard.server.queue.discovery.PartitionService; |
37 | 37 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
38 | -import org.thingsboard.server.queue.kafka.TBKafkaAdmin; | |
39 | -import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; | |
40 | -import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; | |
38 | +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; | |
39 | +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; | |
40 | +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; | |
41 | 41 | import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
42 | 42 | import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
43 | 43 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
... | ... | @@ -76,15 +76,15 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
76 | 76 | this.ruleEngineSettings = ruleEngineSettings; |
77 | 77 | this.jsInvokeSettings = jsInvokeSettings; |
78 | 78 | |
79 | - this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
80 | - this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
81 | - this.jsExecutorAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
82 | - this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
79 | + this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
80 | + this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
81 | + this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); | |
82 | + this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
83 | 83 | } |
84 | 84 | |
85 | 85 | @Override |
86 | 86 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
87 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
87 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
88 | 88 | requestBuilder.settings(kafkaSettings); |
89 | 89 | requestBuilder.clientId("tb-rule-engine-transport-notifications-" + serviceInfoProvider.getServiceId()); |
90 | 90 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -94,7 +94,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
94 | 94 | |
95 | 95 | @Override |
96 | 96 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
97 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
97 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
98 | 98 | requestBuilder.settings(kafkaSettings); |
99 | 99 | requestBuilder.clientId("tb-rule-engine-to-rule-engine-" + serviceInfoProvider.getServiceId()); |
100 | 100 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -104,7 +104,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
104 | 104 | |
105 | 105 | @Override |
106 | 106 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
107 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
107 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
108 | 108 | requestBuilder.settings(kafkaSettings); |
109 | 109 | requestBuilder.clientId("tb-rule-engine-to-rule-engine-notifications-" + serviceInfoProvider.getServiceId()); |
110 | 110 | requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); |
... | ... | @@ -115,7 +115,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
115 | 115 | |
116 | 116 | @Override |
117 | 117 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
118 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
118 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
119 | 119 | requestBuilder.settings(kafkaSettings); |
120 | 120 | requestBuilder.clientId("tb-rule-engine-to-core-" + serviceInfoProvider.getServiceId()); |
121 | 121 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -125,7 +125,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
125 | 125 | |
126 | 126 | @Override |
127 | 127 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
128 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
128 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreNotificationMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
129 | 129 | requestBuilder.settings(kafkaSettings); |
130 | 130 | requestBuilder.clientId("tb-rule-engine-to-core-notifications-" + serviceInfoProvider.getServiceId()); |
131 | 131 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -136,7 +136,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
136 | 136 | @Override |
137 | 137 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
138 | 138 | String queueName = configuration.getName(); |
139 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
139 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
140 | 140 | consumerBuilder.settings(kafkaSettings); |
141 | 141 | consumerBuilder.topic(ruleEngineSettings.getTopic()); |
142 | 142 | consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -148,7 +148,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
148 | 148 | |
149 | 149 | @Override |
150 | 150 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
151 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TBKafkaConsumerTemplate.builder(); | |
151 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder(); | |
152 | 152 | consumerBuilder.settings(kafkaSettings); |
153 | 153 | consumerBuilder.topic(partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName()); |
154 | 154 | consumerBuilder.clientId("tb-rule-engine-notifications-consumer-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -161,13 +161,13 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
161 | 161 | @Override |
162 | 162 | @Bean |
163 | 163 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
164 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
164 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
165 | 165 | requestBuilder.settings(kafkaSettings); |
166 | 166 | requestBuilder.clientId("producer-js-invoke-" + serviceInfoProvider.getServiceId()); |
167 | 167 | requestBuilder.defaultTopic(jsInvokeSettings.getRequestTopic()); |
168 | 168 | requestBuilder.admin(jsExecutorAdmin); |
169 | 169 | |
170 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TBKafkaConsumerTemplate.builder(); | |
170 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> responseBuilder = TbKafkaConsumerTemplate.builder(); | |
171 | 171 | responseBuilder.settings(kafkaSettings); |
172 | 172 | responseBuilder.topic(jsInvokeSettings.getResponseTopic() + "." + serviceInfoProvider.getServiceId()); |
173 | 173 | responseBuilder.clientId("js-" + serviceInfoProvider.getServiceId()); | ... | ... |
... | ... | @@ -30,9 +30,9 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; |
30 | 30 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; |
31 | 31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
32 | 32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
33 | -import org.thingsboard.server.queue.kafka.TBKafkaAdmin; | |
34 | -import org.thingsboard.server.queue.kafka.TBKafkaConsumerTemplate; | |
35 | -import org.thingsboard.server.queue.kafka.TBKafkaProducerTemplate; | |
33 | +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; | |
34 | +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; | |
35 | +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; | |
36 | 36 | import org.thingsboard.server.queue.kafka.TbKafkaSettings; |
37 | 37 | import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; |
38 | 38 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
... | ... | @@ -71,21 +71,21 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { |
71 | 71 | this.transportApiSettings = transportApiSettings; |
72 | 72 | this.transportNotificationSettings = transportNotificationSettings; |
73 | 73 | |
74 | - this.coreAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
75 | - this.ruleEngineAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
76 | - this.transportApiAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
77 | - this.notificationAdmin = new TBKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
74 | + this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); | |
75 | + this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); | |
76 | + this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); | |
77 | + this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); | |
78 | 78 | } |
79 | 79 | |
80 | 80 | @Override |
81 | 81 | public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() { |
82 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
82 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiRequestMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
83 | 83 | requestBuilder.settings(kafkaSettings); |
84 | 84 | requestBuilder.clientId("transport-api-request-" + serviceInfoProvider.getServiceId()); |
85 | 85 | requestBuilder.defaultTopic(transportApiSettings.getRequestsTopic()); |
86 | 86 | requestBuilder.admin(transportApiAdmin); |
87 | 87 | |
88 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TBKafkaConsumerTemplate.builder(); | |
88 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> responseBuilder = TbKafkaConsumerTemplate.builder(); | |
89 | 89 | responseBuilder.settings(kafkaSettings); |
90 | 90 | responseBuilder.topic(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId()); |
91 | 91 | responseBuilder.clientId("transport-api-response-" + serviceInfoProvider.getServiceId()); |
... | ... | @@ -106,7 +106,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { |
106 | 106 | |
107 | 107 | @Override |
108 | 108 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
109 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
109 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
110 | 110 | requestBuilder.settings(kafkaSettings); |
111 | 111 | requestBuilder.clientId("transport-node-rule-engine-" + serviceInfoProvider.getServiceId()); |
112 | 112 | requestBuilder.defaultTopic(ruleEngineSettings.getTopic()); |
... | ... | @@ -116,7 +116,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { |
116 | 116 | |
117 | 117 | @Override |
118 | 118 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
119 | - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TBKafkaProducerTemplate.builder(); | |
119 | + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); | |
120 | 120 | requestBuilder.settings(kafkaSettings); |
121 | 121 | requestBuilder.clientId("transport-node-core-" + serviceInfoProvider.getServiceId()); |
122 | 122 | requestBuilder.defaultTopic(coreSettings.getTopic()); |
... | ... | @@ -126,7 +126,7 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { |
126 | 126 | |
127 | 127 | @Override |
128 | 128 | public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() { |
129 | - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> responseBuilder = TBKafkaConsumerTemplate.builder(); | |
129 | + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> responseBuilder = TbKafkaConsumerTemplate.builder(); | |
130 | 130 | responseBuilder.settings(kafkaSettings); |
131 | 131 | responseBuilder.topic(transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId()); |
132 | 132 | responseBuilder.clientId("transport-api-notifications-" + serviceInfoProvider.getServiceId()); | ... | ... |
... | ... | @@ -121,6 +121,11 @@ public class TbPubSubAdmin implements TbQueueAdmin { |
121 | 121 | } |
122 | 122 | } |
123 | 123 | |
124 | + @Override | |
125 | + public void destroy() { | |
126 | + | |
127 | + } | |
128 | + | |
124 | 129 | private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { |
125 | 130 | ProjectSubscriptionName subscriptionName = |
126 | 131 | ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); | ... | ... |
... | ... | @@ -14,15 +14,14 @@ |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.queue.rabbitmq; |
17 | + | |
17 | 18 | import com.rabbitmq.client.Channel; |
18 | 19 | import com.rabbitmq.client.Connection; |
19 | - | |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | 21 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
22 | 22 | import org.springframework.stereotype.Component; |
23 | 23 | import org.thingsboard.server.queue.TbQueueAdmin; |
24 | 24 | |
25 | -import javax.annotation.PreDestroy; | |
26 | 25 | import java.io.IOException; |
27 | 26 | import java.util.concurrent.TimeoutException; |
28 | 27 | |
... | ... | @@ -62,8 +61,8 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { |
62 | 61 | } |
63 | 62 | } |
64 | 63 | |
65 | - @PreDestroy | |
66 | - private void destroy() { | |
64 | + @Override | |
65 | + public void destroy() { | |
67 | 66 | if (channel != null) { |
68 | 67 | try { |
69 | 68 | channel.close(); | ... | ... |
... | ... | @@ -21,45 +21,56 @@ import com.amazonaws.auth.BasicAWSCredentials; |
21 | 21 | import com.amazonaws.services.sqs.AmazonSQS; |
22 | 22 | import com.amazonaws.services.sqs.AmazonSQSClientBuilder; |
23 | 23 | import com.amazonaws.services.sqs.model.CreateQueueRequest; |
24 | -import com.amazonaws.services.sqs.model.QueueAttributeName; | |
25 | 24 | import org.thingsboard.server.queue.TbQueueAdmin; |
26 | 25 | |
27 | -import java.util.HashMap; | |
28 | 26 | import java.util.Map; |
27 | +import java.util.Set; | |
28 | +import java.util.concurrent.ConcurrentHashMap; | |
29 | +import java.util.stream.Collectors; | |
29 | 30 | |
30 | 31 | public class TbAwsSqsAdmin implements TbQueueAdmin { |
31 | 32 | |
32 | - private final TbAwsSqsSettings sqsSettings; | |
33 | - private final Map<String, String> attributes = new HashMap<>(); | |
34 | - private final AWSStaticCredentialsProvider credProvider; | |
33 | + private final Map<String, String> attributes; | |
34 | + private final AmazonSQS sqsClient; | |
35 | + private final Set<String> queues; | |
35 | 36 | |
36 | - public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings) { | |
37 | - this.sqsSettings = sqsSettings; | |
37 | + public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) { | |
38 | + this.attributes = attributes; | |
38 | 39 | |
39 | 40 | AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); |
40 | - this.credProvider = new AWSStaticCredentialsProvider(awsCredentials); | |
41 | + sqsClient = AmazonSQSClientBuilder.standard() | |
42 | + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) | |
43 | + .withRegion(sqsSettings.getRegion()) | |
44 | + .build(); | |
41 | 45 | |
42 | - attributes.put("FifoQueue", "true"); | |
43 | - attributes.put("ContentBasedDeduplication", "true"); | |
44 | - attributes.put(QueueAttributeName.VisibilityTimeout.toString(), sqsSettings.getVisibilityTimeout()); | |
46 | + queues = sqsClient | |
47 | + .listQueues() | |
48 | + .getQueueUrls() | |
49 | + .stream() | |
50 | + .map(this::getQueueNameFromUrl) | |
51 | + .collect(Collectors.toCollection(ConcurrentHashMap::newKeySet)); | |
45 | 52 | } |
46 | 53 | |
47 | 54 | @Override |
48 | 55 | public void createTopicIfNotExists(String topic) { |
49 | - AmazonSQS sqsClient = AmazonSQSClientBuilder.standard() | |
50 | - .withCredentials(credProvider) | |
51 | - .withRegion(sqsSettings.getRegion()) | |
52 | - .build(); | |
56 | + String queueName = topic.replaceAll("\\.", "_") + ".fifo"; | |
57 | + if (queues.contains(queueName)) { | |
58 | + return; | |
59 | + } | |
60 | + final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes); | |
61 | + String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl(); | |
62 | + queues.add(getQueueNameFromUrl(queueUrl)); | |
63 | + } | |
64 | + | |
65 | + private String getQueueNameFromUrl(String queueUrl) { | |
66 | + int delimiterIndex = queueUrl.lastIndexOf("/"); | |
67 | + return queueUrl.substring(delimiterIndex + 1); | |
68 | + } | |
53 | 69 | |
54 | - final CreateQueueRequest createQueueRequest = | |
55 | - new CreateQueueRequest(topic.replaceAll("\\.", "_") + ".fifo") | |
56 | - .withAttributes(attributes); | |
57 | - try { | |
58 | - sqsClient.createQueue(createQueueRequest); | |
59 | - } finally { | |
60 | - if (sqsClient != null) { | |
61 | - sqsClient.shutdown(); | |
62 | - } | |
70 | + @Override | |
71 | + public void destroy() { | |
72 | + if (sqsClient != null) { | |
73 | + sqsClient.shutdown(); | |
63 | 74 | } |
64 | 75 | } |
65 | 76 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.queue.sqs; | |
17 | + | |
18 | +import com.amazonaws.services.sqs.model.QueueAttributeName; | |
19 | +import lombok.Getter; | |
20 | +import org.springframework.beans.factory.annotation.Value; | |
21 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | |
22 | +import org.springframework.stereotype.Component; | |
23 | + | |
24 | +import javax.annotation.PostConstruct; | |
25 | +import java.util.HashMap; | |
26 | +import java.util.Map; | |
27 | + | |
28 | +@Component | |
29 | +@ConditionalOnExpression("'${queue.type:null}'=='aws-sqs'") | |
30 | +public class TbAwsSqsQueueAttributes { | |
31 | + @Value("${queue.aws-sqs.queue-properties.core}") | |
32 | + private String coreProperties; | |
33 | + @Value("${queue.aws-sqs.queue-properties.rule-engine}") | |
34 | + private String ruleEngineProperties; | |
35 | + @Value("${queue.aws-sqs.queue-properties.transport-api}") | |
36 | + private String transportApiProperties; | |
37 | + @Value("${queue.aws-sqs.queue-properties.notifications}") | |
38 | + private String notificationsProperties; | |
39 | + @Value("${queue.aws-sqs.queue-properties.js-executor}") | |
40 | + private String jsExecutorProperties; | |
41 | + | |
42 | + @Getter | |
43 | + private Map<String, String> coreAttributes; | |
44 | + @Getter | |
45 | + private Map<String, String> ruleEngineAttributes; | |
46 | + @Getter | |
47 | + private Map<String, String> transportApiAttributes; | |
48 | + @Getter | |
49 | + private Map<String, String> notificationsAttributes; | |
50 | + @Getter | |
51 | + private Map<String, String> jsExecutorAttributes; | |
52 | + | |
53 | + private final Map<String, String> defaultAttributes = new HashMap<>(); | |
54 | + | |
55 | + @PostConstruct | |
56 | + private void init() { | |
57 | + defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); | |
58 | + defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); | |
59 | + | |
60 | + coreAttributes = getConfigs(coreProperties); | |
61 | + ruleEngineAttributes = getConfigs(ruleEngineProperties); | |
62 | + transportApiAttributes = getConfigs(transportApiProperties); | |
63 | + notificationsAttributes = getConfigs(notificationsProperties); | |
64 | + jsExecutorAttributes = getConfigs(jsExecutorProperties); | |
65 | + } | |
66 | + | |
67 | + private Map<String, String> getConfigs(String properties) { | |
68 | + Map<String, String> configs = new HashMap<>(); | |
69 | + for (String property : properties.split(";")) { | |
70 | + int delimiterPosition = property.indexOf(":"); | |
71 | + String key = property.substring(0, delimiterPosition); | |
72 | + String value = property.substring(delimiterPosition + 1); | |
73 | + validateAttributeName(key); | |
74 | + configs.put(key, value); | |
75 | + } | |
76 | + configs.putAll(defaultAttributes); | |
77 | + return configs; | |
78 | + } | |
79 | + | |
80 | + private void validateAttributeName(String key) { | |
81 | + QueueAttributeName.fromValue(key); | |
82 | + } | |
83 | +} | ... | ... |
... | ... | @@ -62,7 +62,13 @@ queue: |
62 | 62 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
63 | 63 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
64 | 64 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" |
65 | - visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again | |
65 | + queue-properties: | |
66 | + rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
67 | + core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
68 | + transport-api: "${TB_QUEUE_AWS_SQS_TA_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
69 | + notifications: "${TB_QUEUE_AWS_SQS_NOTIFICATIONS_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
70 | + js-executor: "${TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
71 | + # VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds | |
66 | 72 | pubsub: |
67 | 73 | project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" |
68 | 74 | service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ... | ... |
... | ... | @@ -63,7 +63,13 @@ queue: |
63 | 63 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
64 | 64 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
65 | 65 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" |
66 | - visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again | |
66 | + queue-properties: | |
67 | + rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
68 | + core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
69 | + transport-api: "${TB_QUEUE_AWS_SQS_TA_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
70 | + notifications: "${TB_QUEUE_AWS_SQS_NOTIFICATIONS_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
71 | + js-executor: "${TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
72 | + # VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds | |
67 | 73 | pubsub: |
68 | 74 | project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" |
69 | 75 | service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ... | ... |
... | ... | @@ -93,7 +93,13 @@ queue: |
93 | 93 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
94 | 94 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
95 | 95 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" |
96 | - visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again | |
96 | + queue-properties: | |
97 | + rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
98 | + core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
99 | + transport-api: "${TB_QUEUE_AWS_SQS_TA_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
100 | + notifications: "${TB_QUEUE_AWS_SQS_NOTIFICATIONS_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
101 | + js-executor: "${TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}" | |
102 | + # VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds | |
97 | 103 | pubsub: |
98 | 104 | project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" |
99 | 105 | service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ... | ... |