Showing
24 changed files
with
422 additions
and
65 deletions
... | ... | @@ -36,6 +36,7 @@ |
36 | 36 | |
37 | 37 | <logger name="org.thingsboard.server" level="INFO" /> |
38 | 38 | <logger name="akka" level="INFO" /> |
39 | + <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" /> | |
39 | 40 | |
40 | 41 | <root level="INFO"> |
41 | 42 | <appender-ref ref="fileLogAppender"/> | ... | ... |
... | ... | @@ -31,8 +31,11 @@ |
31 | 31 | <!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />--> |
32 | 32 | <!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />--> |
33 | 33 | |
34 | + <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" /> | |
35 | + | |
34 | 36 | <root level="INFO"> |
35 | 37 | <appender-ref ref="STDOUT"/> |
36 | 38 | </root> |
37 | 39 | |
40 | + | |
38 | 41 | </configuration> |
\ No newline at end of file | ... | ... |
... | ... | @@ -563,6 +563,12 @@ queue: |
563 | 563 | sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" |
564 | 564 | sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" |
565 | 565 | max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" |
566 | + queue-properties: | |
567 | + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
568 | + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
569 | + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
570 | + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
571 | + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
566 | 572 | rabbitmq: |
567 | 573 | exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" |
568 | 574 | host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" | ... | ... |
... | ... | @@ -16,27 +16,32 @@ |
16 | 16 | package org.thingsboard.server.queue.azure.servicebus; |
17 | 17 | |
18 | 18 | import com.microsoft.azure.servicebus.management.ManagementClient; |
19 | +import com.microsoft.azure.servicebus.management.QueueDescription; | |
19 | 20 | import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; |
20 | 21 | import com.microsoft.azure.servicebus.primitives.ServiceBusException; |
21 | 22 | import lombok.extern.slf4j.Slf4j; |
22 | -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | |
23 | -import org.springframework.stereotype.Component; | |
24 | 23 | import org.thingsboard.server.queue.TbQueueAdmin; |
25 | 24 | |
26 | 25 | import java.io.IOException; |
26 | +import java.time.Duration; | |
27 | +import java.util.Map; | |
27 | 28 | import java.util.Set; |
28 | 29 | import java.util.concurrent.ConcurrentHashMap; |
29 | 30 | |
30 | 31 | @Slf4j |
31 | -@Component | |
32 | -@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") | |
33 | 32 | public class TbServiceBusAdmin implements TbQueueAdmin { |
33 | + private final String MAX_SIZE = "maxSizeInMb"; | |
34 | + private final String MESSAGE_TIME_TO_LIVE = "messageTimeToLiveInSec"; | |
35 | + private final String LOCK_DURATION = "lockDurationInSec"; | |
34 | 36 | |
37 | + private final Map<String, String> queueConfigs; | |
35 | 38 | private final Set<String> queues = ConcurrentHashMap.newKeySet(); |
36 | 39 | |
37 | 40 | private final ManagementClient client; |
38 | 41 | |
39 | - public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings) { | |
42 | + public TbServiceBusAdmin(TbServiceBusSettings serviceBusSettings, Map<String, String> queueConfigs) { | |
43 | + this.queueConfigs = queueConfigs; | |
44 | + | |
40 | 45 | ConnectionStringBuilder builder = new ConnectionStringBuilder( |
41 | 46 | serviceBusSettings.getNamespaceName(), |
42 | 47 | "queues", |
... | ... | @@ -60,13 +65,34 @@ public class TbServiceBusAdmin implements TbQueueAdmin { |
60 | 65 | } |
61 | 66 | |
62 | 67 | try { |
63 | - client.createQueue(topic); | |
68 | + QueueDescription queueDescription = new QueueDescription(topic); | |
69 | + setQueueConfigs(queueDescription); | |
70 | + | |
71 | + client.createQueue(queueDescription); | |
64 | 72 | queues.add(topic); |
65 | 73 | } catch (ServiceBusException | InterruptedException e) { |
66 | 74 | log.error("Failed to create queue: [{}]", topic, e); |
67 | 75 | } |
68 | 76 | } |
69 | 77 | |
78 | + private void setQueueConfigs(QueueDescription queueDescription) { | |
79 | + queueConfigs.forEach((confKey, confValue) -> { | |
80 | + switch (confKey) { | |
81 | + case MAX_SIZE: | |
82 | + queueDescription.setMaxSizeInMB(Long.parseLong(confValue)); | |
83 | + break; | |
84 | + case MESSAGE_TIME_TO_LIVE: | |
85 | + queueDescription.setDefaultMessageTimeToLive(Duration.ofSeconds(Long.parseLong(confValue))); | |
86 | + break; | |
87 | + case LOCK_DURATION: | |
88 | + queueDescription.setLockDuration(Duration.ofSeconds(Long.parseLong(confValue))); | |
89 | + break; | |
90 | + default: | |
91 | + log.error("Unknown config: [{}]", confKey); | |
92 | + } | |
93 | + }); | |
94 | + } | |
95 | + | |
70 | 96 | public void destroy() { |
71 | 97 | try { |
72 | 98 | client.close(); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2020 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.queue.azure.servicebus; | |
17 | + | |
18 | +import lombok.Getter; | |
19 | +import org.springframework.beans.factory.annotation.Value; | |
20 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | |
21 | +import org.springframework.stereotype.Component; | |
22 | + | |
23 | +import javax.annotation.PostConstruct; | |
24 | +import java.util.HashMap; | |
25 | +import java.util.Map; | |
26 | + | |
27 | +@Component | |
28 | +@ConditionalOnExpression("'${queue.type:null}'=='service-bus'") | |
29 | +public class TbServiceBusQueueConfigs { | |
30 | + @Value("${queue.service-bus.queue-properties.core}") | |
31 | + private String coreProperties; | |
32 | + @Value("${queue.service-bus.queue-properties.rule-engine}") | |
33 | + private String ruleEngineProperties; | |
34 | + @Value("${queue.service-bus.queue-properties.transport-api}") | |
35 | + private String transportApiProperties; | |
36 | + @Value("${queue.service-bus.queue-properties.notifications}") | |
37 | + private String notificationsProperties; | |
38 | + @Value("${queue.service-bus.queue-properties.js-executor}") | |
39 | + private String jsExecutorProperties; | |
40 | + | |
41 | + @Getter | |
42 | + private Map<String, String> coreConfigs; | |
43 | + @Getter | |
44 | + private Map<String, String> ruleEngineConfigs; | |
45 | + @Getter | |
46 | + private Map<String, String> transportApiConfigs; | |
47 | + @Getter | |
48 | + private Map<String, String> notificationsConfigs; | |
49 | + @Getter | |
50 | + private Map<String, String> jsExecutorConfigs; | |
51 | + | |
52 | + @PostConstruct | |
53 | + private void init() { | |
54 | + coreConfigs = getConfigs(coreProperties); | |
55 | + ruleEngineConfigs = getConfigs(ruleEngineProperties); | |
56 | + transportApiConfigs = getConfigs(transportApiProperties); | |
57 | + notificationsConfigs = getConfigs(notificationsProperties); | |
58 | + jsExecutorConfigs = getConfigs(jsExecutorProperties); | |
59 | + } | |
60 | + | |
61 | + private Map<String, String> getConfigs(String properties) { | |
62 | + Map<String, String> configs = new HashMap<>(); | |
63 | + for (String property : properties.split(";")) { | |
64 | + int delimiterPosition = property.indexOf(":"); | |
65 | + String key = property.substring(0, delimiterPosition); | |
66 | + String value = property.substring(delimiterPosition + 1); | |
67 | + configs.put(key, value); | |
68 | + } | |
69 | + return configs; | |
70 | + } | |
71 | +} | ... | ... |
... | ... | @@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
49 | 49 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
50 | 50 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
51 | 51 | |
52 | +import javax.annotation.PreDestroy; | |
52 | 53 | import java.nio.charset.StandardCharsets; |
53 | 54 | |
54 | 55 | @Component |
... | ... | @@ -247,4 +248,23 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi |
247 | 248 | builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); |
248 | 249 | return builder.build(); |
249 | 250 | } |
251 | + | |
252 | + @PreDestroy | |
253 | + private void destroy() { | |
254 | + if (coreAdmin != null) { | |
255 | + coreAdmin.destroy(); | |
256 | + } | |
257 | + if (ruleEngineAdmin != null) { | |
258 | + ruleEngineAdmin.destroy(); | |
259 | + } | |
260 | + if (jsExecutorAdmin != null) { | |
261 | + jsExecutorAdmin.destroy(); | |
262 | + } | |
263 | + if (transportApiAdmin != null) { | |
264 | + transportApiAdmin.destroy(); | |
265 | + } | |
266 | + if (notificationAdmin != null) { | |
267 | + notificationAdmin.destroy(); | |
268 | + } | |
269 | + } | |
250 | 270 | } | ... | ... |
... | ... | @@ -47,6 +47,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; |
47 | 47 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
48 | 48 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
49 | 49 | |
50 | +import javax.annotation.PreDestroy; | |
50 | 51 | import java.nio.charset.StandardCharsets; |
51 | 52 | |
52 | 53 | @Component |
... | ... | @@ -218,4 +219,22 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { |
218 | 219 | return builder.build(); |
219 | 220 | } |
220 | 221 | |
222 | + @PreDestroy | |
223 | + private void destroy() { | |
224 | + if (coreAdmin != null) { | |
225 | + coreAdmin.destroy(); | |
226 | + } | |
227 | + if (ruleEngineAdmin != null) { | |
228 | + ruleEngineAdmin.destroy(); | |
229 | + } | |
230 | + if (jsExecutorAdmin != null) { | |
231 | + jsExecutorAdmin.destroy(); | |
232 | + } | |
233 | + if (transportApiAdmin != null) { | |
234 | + transportApiAdmin.destroy(); | |
235 | + } | |
236 | + if (notificationAdmin != null) { | |
237 | + notificationAdmin.destroy(); | |
238 | + } | |
239 | + } | |
221 | 240 | } | ... | ... |
... | ... | @@ -45,6 +45,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings; |
45 | 45 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
46 | 46 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
47 | 47 | |
48 | +import javax.annotation.PreDestroy; | |
48 | 49 | import java.nio.charset.StandardCharsets; |
49 | 50 | |
50 | 51 | @Component |
... | ... | @@ -190,4 +191,20 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
190 | 191 | builder.pollInterval(jsInvokeSettings.getResponsePollInterval()); |
191 | 192 | return builder.build(); |
192 | 193 | } |
194 | + | |
195 | + @PreDestroy | |
196 | + private void destroy() { | |
197 | + if (coreAdmin != null) { | |
198 | + coreAdmin.destroy(); | |
199 | + } | |
200 | + if (ruleEngineAdmin != null) { | |
201 | + ruleEngineAdmin.destroy(); | |
202 | + } | |
203 | + if (jsExecutorAdmin != null) { | |
204 | + jsExecutorAdmin.destroy(); | |
205 | + } | |
206 | + if (notificationAdmin != null) { | |
207 | + notificationAdmin.destroy(); | |
208 | + } | |
209 | + } | |
193 | 210 | } | ... | ... |
... | ... | @@ -40,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
40 | 40 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
41 | 41 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
42 | 42 | |
43 | +import javax.annotation.PreDestroy; | |
44 | + | |
43 | 45 | @Component |
44 | 46 | @ConditionalOnExpression("'${queue.type:null}'=='kafka' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
45 | 47 | @Slf4j |
... | ... | @@ -135,4 +137,20 @@ public class KafkaTbTransportQueueFactory implements TbTransportQueueFactory { |
135 | 137 | responseBuilder.admin(notificationAdmin); |
136 | 138 | return responseBuilder.build(); |
137 | 139 | } |
140 | + | |
141 | + @PreDestroy | |
142 | + private void destroy() { | |
143 | + if (coreAdmin != null) { | |
144 | + coreAdmin.destroy(); | |
145 | + } | |
146 | + if (ruleEngineAdmin != null) { | |
147 | + ruleEngineAdmin.destroy(); | |
148 | + } | |
149 | + if (transportApiAdmin != null) { | |
150 | + transportApiAdmin.destroy(); | |
151 | + } | |
152 | + if (notificationAdmin != null) { | |
153 | + notificationAdmin.destroy(); | |
154 | + } | |
155 | + } | |
138 | 156 | } | ... | ... |
common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java
... | ... | @@ -30,19 +30,25 @@ import org.thingsboard.server.queue.TbQueueAdmin; |
30 | 30 | import org.thingsboard.server.queue.TbQueueConsumer; |
31 | 31 | import org.thingsboard.server.queue.TbQueueProducer; |
32 | 32 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
33 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; | |
33 | 34 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
34 | 35 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
36 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; | |
35 | 37 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
36 | 38 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
37 | 39 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
38 | 40 | import org.thingsboard.server.queue.discovery.PartitionService; |
39 | 41 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
42 | +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; | |
43 | +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; | |
40 | 44 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
41 | 45 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
42 | 46 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
43 | 47 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
44 | 48 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
45 | 49 | |
50 | +import javax.annotation.PreDestroy; | |
51 | + | |
46 | 52 | @Component |
47 | 53 | @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='monolith'") |
48 | 54 | public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { |
... | ... | @@ -54,7 +60,12 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul |
54 | 60 | private final TbQueueTransportApiSettings transportApiSettings; |
55 | 61 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
56 | 62 | private final TbServiceBusSettings serviceBusSettings; |
57 | - private final TbQueueAdmin admin; | |
63 | + | |
64 | + private final TbQueueAdmin coreAdmin; | |
65 | + private final TbQueueAdmin ruleEngineAdmin; | |
66 | + private final TbQueueAdmin jsExecutorAdmin; | |
67 | + private final TbQueueAdmin transportApiAdmin; | |
68 | + private final TbQueueAdmin notificationAdmin; | |
58 | 69 | |
59 | 70 | public ServiceBusMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
60 | 71 | TbQueueRuleEngineSettings ruleEngineSettings, |
... | ... | @@ -62,7 +73,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul |
62 | 73 | TbQueueTransportApiSettings transportApiSettings, |
63 | 74 | TbQueueTransportNotificationSettings transportNotificationSettings, |
64 | 75 | TbServiceBusSettings serviceBusSettings, |
65 | - TbQueueAdmin admin) { | |
76 | + TbServiceBusQueueConfigs serviceBusQueueConfigs) { | |
66 | 77 | this.partitionService = partitionService; |
67 | 78 | this.coreSettings = coreSettings; |
68 | 79 | this.serviceInfoProvider = serviceInfoProvider; |
... | ... | @@ -70,73 +81,97 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul |
70 | 81 | this.transportApiSettings = transportApiSettings; |
71 | 82 | this.transportNotificationSettings = transportNotificationSettings; |
72 | 83 | this.serviceBusSettings = serviceBusSettings; |
73 | - this.admin = admin; | |
84 | + | |
85 | + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); | |
86 | + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); | |
87 | + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); | |
88 | + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); | |
89 | + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); | |
74 | 90 | } |
75 | 91 | |
76 | 92 | @Override |
77 | 93 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
78 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); | |
94 | + return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic()); | |
79 | 95 | } |
80 | 96 | |
81 | 97 | @Override |
82 | 98 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
83 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
99 | + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
84 | 100 | } |
85 | 101 | |
86 | 102 | @Override |
87 | 103 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
88 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
104 | + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
89 | 105 | } |
90 | 106 | |
91 | 107 | @Override |
92 | 108 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
93 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
109 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
94 | 110 | } |
95 | 111 | |
96 | 112 | @Override |
97 | 113 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
98 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
114 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
99 | 115 | } |
100 | 116 | |
101 | 117 | @Override |
102 | 118 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
103 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), | |
119 | + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), | |
104 | 120 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
105 | 121 | } |
106 | 122 | |
107 | 123 | @Override |
108 | 124 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
109 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
125 | + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, | |
110 | 126 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
111 | 127 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
112 | 128 | } |
113 | 129 | |
114 | 130 | @Override |
115 | 131 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
116 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), | |
132 | + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(), | |
117 | 133 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
118 | 134 | } |
119 | 135 | |
120 | 136 | @Override |
121 | 137 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
122 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
138 | + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, | |
123 | 139 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
124 | 140 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
125 | 141 | } |
126 | 142 | |
127 | 143 | @Override |
128 | 144 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
129 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), | |
145 | + return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(), | |
130 | 146 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
131 | 147 | } |
132 | 148 | |
133 | 149 | @Override |
134 | 150 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
135 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getResponsesTopic()); | |
151 | + return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic()); | |
136 | 152 | } |
137 | 153 | |
138 | 154 | @Override |
139 | 155 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
140 | 156 | return null; |
141 | 157 | } |
158 | + | |
159 | + @PreDestroy | |
160 | + private void destroy() { | |
161 | + if (coreAdmin != null) { | |
162 | + coreAdmin.destroy(); | |
163 | + } | |
164 | + if (ruleEngineAdmin != null) { | |
165 | + ruleEngineAdmin.destroy(); | |
166 | + } | |
167 | + if (jsExecutorAdmin != null) { | |
168 | + jsExecutorAdmin.destroy(); | |
169 | + } | |
170 | + if (transportApiAdmin != null) { | |
171 | + transportApiAdmin.destroy(); | |
172 | + } | |
173 | + if (notificationAdmin != null) { | |
174 | + notificationAdmin.destroy(); | |
175 | + } | |
176 | + } | |
142 | 177 | } | ... | ... |
... | ... | @@ -29,8 +29,10 @@ import org.thingsboard.server.queue.TbQueueAdmin; |
29 | 29 | import org.thingsboard.server.queue.TbQueueConsumer; |
30 | 30 | import org.thingsboard.server.queue.TbQueueProducer; |
31 | 31 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
32 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; | |
32 | 33 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
33 | 34 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
35 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; | |
34 | 36 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
35 | 37 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
36 | 38 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
... | ... | @@ -40,6 +42,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
40 | 42 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
41 | 43 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
42 | 44 | |
45 | +import javax.annotation.PreDestroy; | |
46 | + | |
43 | 47 | @Component |
44 | 48 | @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-core'") |
45 | 49 | public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { |
... | ... | @@ -50,7 +54,12 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { |
50 | 54 | private final TbQueueTransportApiSettings transportApiSettings; |
51 | 55 | private final PartitionService partitionService; |
52 | 56 | private final TbServiceInfoProvider serviceInfoProvider; |
53 | - private final TbQueueAdmin admin; | |
57 | + | |
58 | + private final TbQueueAdmin coreAdmin; | |
59 | + private final TbQueueAdmin ruleEngineAdmin; | |
60 | + private final TbQueueAdmin jsExecutorAdmin; | |
61 | + private final TbQueueAdmin transportApiAdmin; | |
62 | + private final TbQueueAdmin notificationAdmin; | |
54 | 63 | |
55 | 64 | public ServiceBusTbCoreQueueFactory(TbServiceBusSettings serviceBusSettings, |
56 | 65 | TbQueueCoreSettings coreSettings, |
... | ... | @@ -58,67 +67,91 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { |
58 | 67 | TbQueueRuleEngineSettings ruleEngineSettings, |
59 | 68 | PartitionService partitionService, |
60 | 69 | TbServiceInfoProvider serviceInfoProvider, |
61 | - TbQueueAdmin admin) { | |
70 | + TbServiceBusQueueConfigs serviceBusQueueConfigs) { | |
62 | 71 | this.serviceBusSettings = serviceBusSettings; |
63 | 72 | this.coreSettings = coreSettings; |
64 | 73 | this.transportApiSettings = transportApiSettings; |
65 | 74 | this.ruleEngineSettings = ruleEngineSettings; |
66 | 75 | this.partitionService = partitionService; |
67 | 76 | this.serviceInfoProvider = serviceInfoProvider; |
68 | - this.admin = admin; | |
77 | + | |
78 | + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); | |
79 | + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); | |
80 | + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); | |
81 | + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); | |
82 | + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); | |
69 | 83 | } |
70 | 84 | |
71 | 85 | @Override |
72 | 86 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
73 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
87 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
74 | 88 | } |
75 | 89 | |
76 | 90 | @Override |
77 | 91 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
78 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
92 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
79 | 93 | } |
80 | 94 | |
81 | 95 | @Override |
82 | 96 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
83 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
97 | + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
84 | 98 | } |
85 | 99 | |
86 | 100 | @Override |
87 | 101 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
88 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
102 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
89 | 103 | } |
90 | 104 | |
91 | 105 | @Override |
92 | 106 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
93 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
107 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
94 | 108 | } |
95 | 109 | |
96 | 110 | @Override |
97 | 111 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
98 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic(), | |
112 | + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic(), | |
99 | 113 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
100 | 114 | } |
101 | 115 | |
102 | 116 | @Override |
103 | 117 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
104 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
118 | + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, | |
105 | 119 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
106 | 120 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
107 | 121 | } |
108 | 122 | |
109 | 123 | @Override |
110 | 124 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
111 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic(), | |
125 | + return new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic(), | |
112 | 126 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
113 | 127 | } |
114 | 128 | |
115 | 129 | @Override |
116 | 130 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
117 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
131 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
118 | 132 | } |
119 | 133 | |
120 | 134 | @Override |
121 | 135 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
122 | 136 | return null; |
123 | 137 | } |
138 | + | |
139 | + @PreDestroy | |
140 | + private void destroy() { | |
141 | + if (coreAdmin != null) { | |
142 | + coreAdmin.destroy(); | |
143 | + } | |
144 | + if (ruleEngineAdmin != null) { | |
145 | + ruleEngineAdmin.destroy(); | |
146 | + } | |
147 | + if (jsExecutorAdmin != null) { | |
148 | + jsExecutorAdmin.destroy(); | |
149 | + } | |
150 | + if (transportApiAdmin != null) { | |
151 | + transportApiAdmin.destroy(); | |
152 | + } | |
153 | + if (notificationAdmin != null) { | |
154 | + notificationAdmin.destroy(); | |
155 | + } | |
156 | + } | |
124 | 157 | } | ... | ... |
... | ... | @@ -27,8 +27,10 @@ import org.thingsboard.server.queue.TbQueueAdmin; |
27 | 27 | import org.thingsboard.server.queue.TbQueueConsumer; |
28 | 28 | import org.thingsboard.server.queue.TbQueueProducer; |
29 | 29 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
30 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; | |
30 | 31 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
31 | 32 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
33 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; | |
32 | 34 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
33 | 35 | import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; |
34 | 36 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
... | ... | @@ -38,6 +40,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
38 | 40 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
39 | 41 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
40 | 42 | |
43 | +import javax.annotation.PreDestroy; | |
44 | + | |
41 | 45 | @Component |
42 | 46 | @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && '${service.type:null}'=='tb-rule-engine'") |
43 | 47 | public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
... | ... | @@ -47,55 +51,63 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact |
47 | 51 | private final TbServiceInfoProvider serviceInfoProvider; |
48 | 52 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
49 | 53 | private final TbServiceBusSettings serviceBusSettings; |
50 | - private final TbQueueAdmin admin; | |
54 | + | |
55 | + private final TbQueueAdmin coreAdmin; | |
56 | + private final TbQueueAdmin ruleEngineAdmin; | |
57 | + private final TbQueueAdmin jsExecutorAdmin; | |
58 | + private final TbQueueAdmin notificationAdmin; | |
51 | 59 | |
52 | 60 | public ServiceBusTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
53 | 61 | TbQueueRuleEngineSettings ruleEngineSettings, |
54 | 62 | TbServiceInfoProvider serviceInfoProvider, |
55 | 63 | TbServiceBusSettings serviceBusSettings, |
56 | - TbQueueAdmin admin) { | |
64 | + TbServiceBusQueueConfigs serviceBusQueueConfigs) { | |
57 | 65 | this.partitionService = partitionService; |
58 | 66 | this.coreSettings = coreSettings; |
59 | 67 | this.serviceInfoProvider = serviceInfoProvider; |
60 | 68 | this.ruleEngineSettings = ruleEngineSettings; |
61 | 69 | this.serviceBusSettings = serviceBusSettings; |
62 | - this.admin = admin; | |
70 | + | |
71 | + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); | |
72 | + this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs()); | |
73 | + this.jsExecutorAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getJsExecutorConfigs()); | |
74 | + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); | |
63 | 75 | } |
64 | 76 | |
65 | 77 | @Override |
66 | 78 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
67 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
79 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
68 | 80 | } |
69 | 81 | |
70 | 82 | @Override |
71 | 83 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
72 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
84 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
73 | 85 | } |
74 | 86 | |
75 | 87 | @Override |
76 | 88 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
77 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
89 | + return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic()); | |
78 | 90 | } |
79 | 91 | |
80 | 92 | @Override |
81 | 93 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
82 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
94 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
83 | 95 | } |
84 | 96 | |
85 | 97 | @Override |
86 | 98 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
87 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
99 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
88 | 100 | } |
89 | 101 | |
90 | 102 | @Override |
91 | 103 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
92 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, ruleEngineSettings.getTopic(), | |
104 | + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(), | |
93 | 105 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
94 | 106 | } |
95 | 107 | |
96 | 108 | @Override |
97 | 109 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
98 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
110 | + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, | |
99 | 111 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
100 | 112 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
101 | 113 | } |
... | ... | @@ -104,4 +116,20 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact |
104 | 116 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
105 | 117 | return null; |
106 | 118 | } |
119 | + | |
120 | + @PreDestroy | |
121 | + private void destroy() { | |
122 | + if (coreAdmin != null) { | |
123 | + coreAdmin.destroy(); | |
124 | + } | |
125 | + if (ruleEngineAdmin != null) { | |
126 | + ruleEngineAdmin.destroy(); | |
127 | + } | |
128 | + if (jsExecutorAdmin != null) { | |
129 | + jsExecutorAdmin.destroy(); | |
130 | + } | |
131 | + if (notificationAdmin != null) { | |
132 | + notificationAdmin.destroy(); | |
133 | + } | |
134 | + } | |
107 | 135 | } | ... | ... |
... | ... | @@ -23,6 +23,8 @@ import org.thingsboard.server.queue.TbQueueAdmin; |
23 | 23 | import org.thingsboard.server.queue.TbQueueConsumer; |
24 | 24 | import org.thingsboard.server.queue.TbQueueProducer; |
25 | 25 | import org.thingsboard.server.queue.TbQueueRequestTemplate; |
26 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusAdmin; | |
27 | +import org.thingsboard.server.queue.azure.servicebus.TbServiceBusQueueConfigs; | |
26 | 28 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; |
27 | 29 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
28 | 30 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
... | ... | @@ -33,6 +35,8 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
33 | 35 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
34 | 36 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
35 | 37 | |
38 | +import javax.annotation.PreDestroy; | |
39 | + | |
36 | 40 | @Component |
37 | 41 | @ConditionalOnExpression("'${queue.type:null}'=='service-bus' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
38 | 42 | @Slf4j |
... | ... | @@ -40,37 +44,43 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory |
40 | 44 | private final TbQueueTransportApiSettings transportApiSettings; |
41 | 45 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
42 | 46 | private final TbServiceBusSettings serviceBusSettings; |
43 | - private final TbQueueAdmin admin; | |
44 | 47 | private final TbServiceInfoProvider serviceInfoProvider; |
45 | 48 | private final TbQueueCoreSettings coreSettings; |
46 | 49 | |
50 | + private final TbQueueAdmin coreAdmin; | |
51 | + private final TbQueueAdmin transportApiAdmin; | |
52 | + private final TbQueueAdmin notificationAdmin; | |
53 | + | |
47 | 54 | public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
48 | - TbQueueTransportNotificationSettings transportNotificationSettings, | |
49 | - TbServiceBusSettings serviceBusSettings, | |
50 | - TbServiceInfoProvider serviceInfoProvider, | |
55 | + TbQueueTransportNotificationSettings transportNotificationSettings, | |
56 | + TbServiceBusSettings serviceBusSettings, | |
57 | + TbServiceInfoProvider serviceInfoProvider, | |
51 | 58 | TbQueueCoreSettings coreSettings, |
52 | - TbQueueAdmin admin) { | |
59 | + TbServiceBusQueueConfigs serviceBusQueueConfigs) { | |
53 | 60 | this.transportApiSettings = transportApiSettings; |
54 | 61 | this.transportNotificationSettings = transportNotificationSettings; |
55 | 62 | this.serviceBusSettings = serviceBusSettings; |
56 | - this.admin = admin; | |
57 | 63 | this.serviceInfoProvider = serviceInfoProvider; |
58 | 64 | this.coreSettings = coreSettings; |
65 | + | |
66 | + this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs()); | |
67 | + this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs()); | |
68 | + this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs()); | |
59 | 69 | } |
60 | 70 | |
61 | 71 | @Override |
62 | 72 | public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiRequestTemplate() { |
63 | 73 | TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> producerTemplate = |
64 | - new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); | |
74 | + new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic()); | |
65 | 75 | |
66 | 76 | TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> consumerTemplate = |
67 | - new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
77 | + new TbServiceBusConsumerTemplate<>(transportApiAdmin, serviceBusSettings, | |
68 | 78 | transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), |
69 | 79 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); |
70 | 80 | |
71 | 81 | DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder |
72 | 82 | <TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); |
73 | - templateBuilder.queueAdmin(admin); | |
83 | + templateBuilder.queueAdmin(transportApiAdmin); | |
74 | 84 | templateBuilder.requestTemplate(producerTemplate); |
75 | 85 | templateBuilder.responseTemplate(consumerTemplate); |
76 | 86 | templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); |
... | ... | @@ -81,18 +91,31 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory |
81 | 91 | |
82 | 92 | @Override |
83 | 93 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
84 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); | |
94 | + return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic()); | |
85 | 95 | } |
86 | 96 | |
87 | 97 | @Override |
88 | 98 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); | |
99 | + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic()); | |
90 | 100 | } |
91 | 101 | |
92 | 102 | @Override |
93 | 103 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsConsumer() { |
94 | - return new TbServiceBusConsumerTemplate<>(admin, serviceBusSettings, | |
104 | + return new TbServiceBusConsumerTemplate<>(notificationAdmin, serviceBusSettings, | |
95 | 105 | transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), |
96 | 106 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); |
97 | 107 | } |
108 | + | |
109 | + @PreDestroy | |
110 | + private void destroy() { | |
111 | + if (coreAdmin != null) { | |
112 | + coreAdmin.destroy(); | |
113 | + } | |
114 | + if (transportApiAdmin != null) { | |
115 | + transportApiAdmin.destroy(); | |
116 | + } | |
117 | + if (notificationAdmin != null) { | |
118 | + notificationAdmin.destroy(); | |
119 | + } | |
120 | + } | |
98 | 121 | } | ... | ... |
... | ... | @@ -42,6 +42,7 @@ |
42 | 42 | |
43 | 43 | <logger name="org.thingsboard.server" level="INFO" /> |
44 | 44 | <logger name="akka" level="INFO" /> |
45 | + <logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" /> | |
45 | 46 | |
46 | 47 | <root level="INFO"> |
47 | 48 | <appender-ref ref="fileLogAppender"/> | ... | ... |
... | ... | @@ -907,6 +907,12 @@ |
907 | 907 | <groupId>com.microsoft.azure</groupId> |
908 | 908 | <artifactId>azure-servicebus</artifactId> |
909 | 909 | <version>${azure-servicebus.version}</version> |
910 | + <exclusions> | |
911 | + <exclusion> | |
912 | + <groupId>com.microsoft.azure</groupId> | |
913 | + <artifactId>adal4j</artifactId> | |
914 | + </exclusion> | |
915 | + </exclusions> | |
910 | 916 | </dependency> |
911 | 917 | <dependency> |
912 | 918 | <groupId>org.passay</groupId> | ... | ... |
... | ... | @@ -17,10 +17,20 @@ |
17 | 17 | spring.main.web-environment: false |
18 | 18 | spring.main.web-application-type: none |
19 | 19 | |
20 | -# Clustering properties | |
21 | -cluster: | |
22 | - # Unique id for this node (autogenerated if empty) | |
23 | - node_id: "${CLUSTER_NODE_ID:}" | |
20 | +# Zookeeper connection parameters. Used for service discovery. | |
21 | +zk: | |
22 | + # Enable/disable zookeeper discovery service. | |
23 | + enabled: "${ZOOKEEPER_ENABLED:false}" | |
24 | + # Zookeeper connect string | |
25 | + url: "${ZOOKEEPER_URL:localhost:2181}" | |
26 | + # Zookeeper retry interval in milliseconds | |
27 | + retry_interval_ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}" | |
28 | + # Zookeeper connection timeout in milliseconds | |
29 | + connection_timeout_ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}" | |
30 | + # Zookeeper session timeout in milliseconds | |
31 | + session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" | |
32 | + # Name of the directory in zookeeper 'filesystem' | |
33 | + zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" | |
24 | 34 | |
25 | 35 | # COAP server parameters |
26 | 36 | transport: |
... | ... | @@ -42,7 +52,7 @@ transport: |
42 | 52 | max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" |
43 | 53 | |
44 | 54 | queue: |
45 | - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus or rabbitmq | |
55 | + type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) | |
46 | 56 | kafka: |
47 | 57 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
48 | 58 | acks: "${TB_KAFKA_ACKS:all}" |
... | ... | @@ -85,6 +95,12 @@ queue: |
85 | 95 | sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" |
86 | 96 | sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" |
87 | 97 | max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" |
98 | + queue-properties: | |
99 | + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
100 | + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
101 | + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
102 | + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
103 | + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
88 | 104 | rabbitmq: |
89 | 105 | exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" |
90 | 106 | host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" | ... | ... |
... | ... | @@ -20,10 +20,20 @@ server: |
20 | 20 | # Server bind port |
21 | 21 | port: "${HTTP_BIND_PORT:8081}" |
22 | 22 | |
23 | -# Clustering properties | |
24 | -cluster: | |
25 | - # Unique id for this node (autogenerated if empty) | |
26 | - node_id: "${CLUSTER_NODE_ID:}" | |
23 | +# Zookeeper connection parameters. Used for service discovery. | |
24 | +zk: | |
25 | + # Enable/disable zookeeper discovery service. | |
26 | + enabled: "${ZOOKEEPER_ENABLED:false}" | |
27 | + # Zookeeper connect string | |
28 | + url: "${ZOOKEEPER_URL:localhost:2181}" | |
29 | + # Zookeeper retry interval in milliseconds | |
30 | + retry_interval_ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}" | |
31 | + # Zookeeper connection timeout in milliseconds | |
32 | + connection_timeout_ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}" | |
33 | + # Zookeeper session timeout in milliseconds | |
34 | + session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}" | |
35 | + # Name of the directory in zookeeper 'filesystem' | |
36 | + zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" | |
27 | 37 | |
28 | 38 | # HTTP server parameters |
29 | 39 | transport: |
... | ... | @@ -43,7 +53,7 @@ transport: |
43 | 53 | max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" |
44 | 54 | |
45 | 55 | queue: |
46 | - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus or rabbitmq | |
56 | + type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) | |
47 | 57 | kafka: |
48 | 58 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
49 | 59 | acks: "${TB_KAFKA_ACKS:all}" |
... | ... | @@ -86,6 +96,12 @@ queue: |
86 | 96 | sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" |
87 | 97 | sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" |
88 | 98 | max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" |
99 | + queue-properties: | |
100 | + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
101 | + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
102 | + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
103 | + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
104 | + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
89 | 105 | rabbitmq: |
90 | 106 | exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" |
91 | 107 | host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" | ... | ... |
... | ... | @@ -73,7 +73,7 @@ transport: |
73 | 73 | max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" |
74 | 74 | |
75 | 75 | queue: |
76 | - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus or rabbitmq | |
76 | + type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) | |
77 | 77 | kafka: |
78 | 78 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
79 | 79 | acks: "${TB_KAFKA_ACKS:all}" |
... | ... | @@ -116,6 +116,12 @@ queue: |
116 | 116 | sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}" |
117 | 117 | sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}" |
118 | 118 | max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}" |
119 | + queue-properties: | |
120 | + rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
121 | + core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
122 | + transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
123 | + notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
124 | + js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}" | |
119 | 125 | rabbitmq: |
120 | 126 | exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}" |
121 | 127 | host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}" | ... | ... |