Commit 283ad27cb54d476e011b92e4de94aac465a89c39
Committed by
Andrew Shvayka
1 parent
2ad4ddf1
added rabbitmq queue arguments
Showing
13 changed files
with
319 additions
and
56 deletions
@@ -566,6 +566,12 @@ queue: | @@ -566,6 +566,12 @@ queue: | ||
566 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" | 566 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" |
567 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" | 567 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" |
568 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" | 568 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" |
569 | + queue-properties: | ||
570 | + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
571 | + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
572 | + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
573 | + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
574 | + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
569 | partitions: | 575 | partitions: |
570 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 576 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
571 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 577 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |
@@ -30,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; | @@ -30,6 +30,7 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; | ||
30 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | 30 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; |
31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
33 | +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
33 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 34 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
34 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; | 35 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
35 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; | 36 | import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; |
@@ -47,8 +48,10 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | @@ -47,8 +48,10 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | ||
47 | private final TbQueueTransportApiSettings transportApiSettings; | 48 | private final TbQueueTransportApiSettings transportApiSettings; |
48 | private final TbQueueTransportNotificationSettings transportNotificationSettings; | 49 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
49 | private final TbAwsSqsSettings sqsSettings; | 50 | private final TbAwsSqsSettings sqsSettings; |
51 | + private final TbQueueCoreSettings coreSettings; | ||
50 | private final TbServiceInfoProvider serviceInfoProvider; | 52 | private final TbServiceInfoProvider serviceInfoProvider; |
51 | 53 | ||
54 | + private final TbQueueAdmin coreAdmin; | ||
52 | private final TbQueueAdmin transportApiAdmin; | 55 | private final TbQueueAdmin transportApiAdmin; |
53 | private final TbQueueAdmin notificationAdmin; | 56 | private final TbQueueAdmin notificationAdmin; |
54 | 57 | ||
@@ -56,12 +59,15 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | @@ -56,12 +59,15 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | ||
56 | TbQueueTransportNotificationSettings transportNotificationSettings, | 59 | TbQueueTransportNotificationSettings transportNotificationSettings, |
57 | TbAwsSqsSettings sqsSettings, | 60 | TbAwsSqsSettings sqsSettings, |
58 | TbServiceInfoProvider serviceInfoProvider, | 61 | TbServiceInfoProvider serviceInfoProvider, |
62 | + TbQueueCoreSettings coreSettings, | ||
59 | TbAwsSqsQueueAttributes sqsQueueAttributes) { | 63 | TbAwsSqsQueueAttributes sqsQueueAttributes) { |
60 | this.transportApiSettings = transportApiSettings; | 64 | this.transportApiSettings = transportApiSettings; |
61 | this.transportNotificationSettings = transportNotificationSettings; | 65 | this.transportNotificationSettings = transportNotificationSettings; |
62 | this.sqsSettings = sqsSettings; | 66 | this.sqsSettings = sqsSettings; |
63 | this.serviceInfoProvider = serviceInfoProvider; | 67 | this.serviceInfoProvider = serviceInfoProvider; |
68 | + this.coreSettings = coreSettings; | ||
64 | 69 | ||
70 | + this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes()); | ||
65 | this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); | 71 | this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes()); |
66 | this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); | 72 | this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes()); |
67 | } | 73 | } |
@@ -94,7 +100,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | @@ -94,7 +100,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | ||
94 | 100 | ||
95 | @Override | 101 | @Override |
96 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { | 102 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
97 | - return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic()); | 103 | + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic()); |
98 | } | 104 | } |
99 | 105 | ||
100 | @Override | 106 | @Override |
@@ -105,6 +111,9 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | @@ -105,6 +111,9 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory { | ||
105 | 111 | ||
106 | @PreDestroy | 112 | @PreDestroy |
107 | private void destroy() { | 113 | private void destroy() { |
114 | + if (coreAdmin != null) { | ||
115 | + coreAdmin.destroy(); | ||
116 | + } | ||
108 | if (transportApiAdmin != null) { | 117 | if (transportApiAdmin != null) { |
109 | transportApiAdmin.destroy(); | 118 | transportApiAdmin.destroy(); |
110 | } | 119 | } |
@@ -32,9 +32,9 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; | @@ -32,9 +32,9 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; | ||
32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
33 | import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; | 33 | import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; |
34 | import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; | 34 | import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; |
35 | +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
35 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 36 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
36 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; | 37 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
37 | -import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin; | ||
38 | 38 | ||
39 | @Component | 39 | @Component |
40 | @ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") | 40 | @ConditionalOnExpression("'${queue.type:null}'=='in-memory' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
@@ -43,13 +43,16 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory | @@ -43,13 +43,16 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory | ||
43 | private final TbQueueTransportApiSettings transportApiSettings; | 43 | private final TbQueueTransportApiSettings transportApiSettings; |
44 | private final TbQueueTransportNotificationSettings transportNotificationSettings; | 44 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
45 | private final TbServiceInfoProvider serviceInfoProvider; | 45 | private final TbServiceInfoProvider serviceInfoProvider; |
46 | + private final TbQueueCoreSettings coreSettings; | ||
46 | 47 | ||
47 | public InMemoryTbTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, | 48 | public InMemoryTbTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
48 | TbQueueTransportNotificationSettings transportNotificationSettings, | 49 | TbQueueTransportNotificationSettings transportNotificationSettings, |
49 | - TbServiceInfoProvider serviceInfoProvider) { | 50 | + TbServiceInfoProvider serviceInfoProvider, |
51 | + TbQueueCoreSettings coreSettings) { | ||
50 | this.transportApiSettings = transportApiSettings; | 52 | this.transportApiSettings = transportApiSettings; |
51 | this.transportNotificationSettings = transportNotificationSettings; | 53 | this.transportNotificationSettings = transportNotificationSettings; |
52 | this.serviceInfoProvider = serviceInfoProvider; | 54 | this.serviceInfoProvider = serviceInfoProvider; |
55 | + this.coreSettings = coreSettings; | ||
53 | } | 56 | } |
54 | 57 | ||
55 | @Override | 58 | @Override |
@@ -86,7 +89,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory | @@ -86,7 +89,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory | ||
86 | 89 | ||
87 | @Override | 90 | @Override |
88 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { | 91 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic()); | 92 | + return new InMemoryTbQueueProducer<>(coreSettings.getTopic()); |
90 | } | 93 | } |
91 | 94 | ||
92 | @Override | 95 | @Override |
@@ -28,8 +28,10 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | @@ -28,8 +28,10 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | ||
28 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 28 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
29 | import org.thingsboard.server.queue.discovery.PartitionService; | 29 | import org.thingsboard.server.queue.discovery.PartitionService; |
30 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 30 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
31 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; | ||
31 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; | 32 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; |
32 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | 33 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; |
34 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | ||
33 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 35 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
34 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 36 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
35 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 37 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
@@ -37,6 +39,8 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | @@ -37,6 +39,8 @@ import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | ||
37 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; | 39 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
38 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | 40 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
39 | 41 | ||
42 | +import javax.annotation.PreDestroy; | ||
43 | + | ||
40 | @Component | 44 | @Component |
41 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'") | 45 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'") |
42 | public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { | 46 | public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory { |
@@ -48,7 +52,12 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | @@ -48,7 +52,12 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | ||
48 | private final TbQueueTransportApiSettings transportApiSettings; | 52 | private final TbQueueTransportApiSettings transportApiSettings; |
49 | private final TbQueueTransportNotificationSettings transportNotificationSettings; | 53 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
50 | private final TbRabbitMqSettings rabbitMqSettings; | 54 | private final TbRabbitMqSettings rabbitMqSettings; |
51 | - private final TbQueueAdmin admin; | 55 | + |
56 | + private final TbQueueAdmin coreAdmin; | ||
57 | + private final TbQueueAdmin ruleEngineAdmin; | ||
58 | + private final TbQueueAdmin jsExecutorAdmin; | ||
59 | + private final TbQueueAdmin transportApiAdmin; | ||
60 | + private final TbQueueAdmin notificationAdmin; | ||
52 | 61 | ||
53 | public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, | 62 | public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
54 | TbQueueRuleEngineSettings ruleEngineSettings, | 63 | TbQueueRuleEngineSettings ruleEngineSettings, |
@@ -56,7 +65,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | @@ -56,7 +65,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | ||
56 | TbQueueTransportApiSettings transportApiSettings, | 65 | TbQueueTransportApiSettings transportApiSettings, |
57 | TbQueueTransportNotificationSettings transportNotificationSettings, | 66 | TbQueueTransportNotificationSettings transportNotificationSettings, |
58 | TbRabbitMqSettings rabbitMqSettings, | 67 | TbRabbitMqSettings rabbitMqSettings, |
59 | - TbQueueAdmin admin) { | 68 | + TbRabbitMqQueueArguments queueArguments) { |
60 | this.partitionService = partitionService; | 69 | this.partitionService = partitionService; |
61 | this.coreSettings = coreSettings; | 70 | this.coreSettings = coreSettings; |
62 | this.serviceInfoProvider = serviceInfoProvider; | 71 | this.serviceInfoProvider = serviceInfoProvider; |
@@ -64,73 +73,97 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | @@ -64,73 +73,97 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE | ||
64 | this.transportApiSettings = transportApiSettings; | 73 | this.transportApiSettings = transportApiSettings; |
65 | this.transportNotificationSettings = transportNotificationSettings; | 74 | this.transportNotificationSettings = transportNotificationSettings; |
66 | this.rabbitMqSettings = rabbitMqSettings; | 75 | this.rabbitMqSettings = rabbitMqSettings; |
67 | - this.admin = admin; | 76 | + |
77 | + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | ||
78 | + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | ||
79 | + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); | ||
80 | + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); | ||
81 | + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); | ||
68 | } | 82 | } |
69 | 83 | ||
70 | @Override | 84 | @Override |
71 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() { | 85 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() { |
72 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic()); | 86 | + return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic()); |
73 | } | 87 | } |
74 | 88 | ||
75 | @Override | 89 | @Override |
76 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { | 90 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
77 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); | 91 | + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); |
78 | } | 92 | } |
79 | 93 | ||
80 | @Override | 94 | @Override |
81 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { | 95 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
82 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); | 96 | + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); |
83 | } | 97 | } |
84 | 98 | ||
85 | @Override | 99 | @Override |
86 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { | 100 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { |
87 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 101 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
88 | } | 102 | } |
89 | 103 | ||
90 | @Override | 104 | @Override |
91 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { | 105 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
92 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 106 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
93 | } | 107 | } |
94 | 108 | ||
95 | @Override | 109 | @Override |
96 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { | 110 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
97 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(), | 111 | + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), |
98 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); | 112 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
99 | } | 113 | } |
100 | 114 | ||
101 | @Override | 115 | @Override |
102 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { | 116 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
103 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, | 117 | + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, |
104 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), | 118 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
105 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | 119 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
106 | } | 120 | } |
107 | 121 | ||
108 | @Override | 122 | @Override |
109 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() { | 123 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() { |
110 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(), | 124 | + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic(), |
111 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); | 125 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
112 | } | 126 | } |
113 | 127 | ||
114 | @Override | 128 | @Override |
115 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { | 129 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
116 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, | 130 | + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, |
117 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), | 131 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
118 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | 132 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
119 | } | 133 | } |
120 | 134 | ||
121 | @Override | 135 | @Override |
122 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() { | 136 | public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
123 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), | 137 | + return new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), |
124 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); | 138 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
125 | } | 139 | } |
126 | 140 | ||
127 | @Override | 141 | @Override |
128 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() { | 142 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() { |
129 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getResponsesTopic()); | 143 | + return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getResponsesTopic()); |
130 | } | 144 | } |
131 | 145 | ||
132 | @Override | 146 | @Override |
133 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 147 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
134 | return null; | 148 | return null; |
135 | } | 149 | } |
150 | + | ||
151 | + @PreDestroy | ||
152 | + private void destroy() { | ||
153 | + if (coreAdmin != null) { | ||
154 | + coreAdmin.destroy(); | ||
155 | + } | ||
156 | + if (ruleEngineAdmin != null) { | ||
157 | + ruleEngineAdmin.destroy(); | ||
158 | + } | ||
159 | + if (jsExecutorAdmin != null) { | ||
160 | + jsExecutorAdmin.destroy(); | ||
161 | + } | ||
162 | + if (transportApiAdmin != null) { | ||
163 | + transportApiAdmin.destroy(); | ||
164 | + } | ||
165 | + if (notificationAdmin != null) { | ||
166 | + notificationAdmin.destroy(); | ||
167 | + } | ||
168 | + } | ||
136 | } | 169 | } |
@@ -34,13 +34,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | @@ -34,13 +34,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | ||
34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
35 | import org.thingsboard.server.queue.discovery.PartitionService; | 35 | import org.thingsboard.server.queue.discovery.PartitionService; |
36 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 36 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
37 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; | ||
37 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; | 38 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; |
38 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | 39 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; |
40 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | ||
39 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 41 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
40 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 42 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
41 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 43 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
42 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 44 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
43 | 45 | ||
46 | +import javax.annotation.PreDestroy; | ||
47 | + | ||
44 | @Component | 48 | @Component |
45 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") | 49 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'") |
46 | public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | 50 | public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { |
@@ -51,7 +55,12 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -51,7 +55,12 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
51 | private final TbQueueTransportApiSettings transportApiSettings; | 55 | private final TbQueueTransportApiSettings transportApiSettings; |
52 | private final PartitionService partitionService; | 56 | private final PartitionService partitionService; |
53 | private final TbServiceInfoProvider serviceInfoProvider; | 57 | private final TbServiceInfoProvider serviceInfoProvider; |
54 | - private final TbQueueAdmin admin; | 58 | + |
59 | + private final TbQueueAdmin coreAdmin; | ||
60 | + private final TbQueueAdmin ruleEngineAdmin; | ||
61 | + private final TbQueueAdmin jsExecutorAdmin; | ||
62 | + private final TbQueueAdmin transportApiAdmin; | ||
63 | + private final TbQueueAdmin notificationAdmin; | ||
55 | 64 | ||
56 | public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, | 65 | public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings, |
57 | TbQueueCoreSettings coreSettings, | 66 | TbQueueCoreSettings coreSettings, |
@@ -59,67 +68,91 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | @@ -59,67 +68,91 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { | ||
59 | TbQueueRuleEngineSettings ruleEngineSettings, | 68 | TbQueueRuleEngineSettings ruleEngineSettings, |
60 | PartitionService partitionService, | 69 | PartitionService partitionService, |
61 | TbServiceInfoProvider serviceInfoProvider, | 70 | TbServiceInfoProvider serviceInfoProvider, |
62 | - TbQueueAdmin admin) { | 71 | + TbRabbitMqQueueArguments queueArguments) { |
63 | this.rabbitMqSettings = rabbitMqSettings; | 72 | this.rabbitMqSettings = rabbitMqSettings; |
64 | this.coreSettings = coreSettings; | 73 | this.coreSettings = coreSettings; |
65 | this.transportApiSettings = transportApiSettings; | 74 | this.transportApiSettings = transportApiSettings; |
66 | this.ruleEngineSettings = ruleEngineSettings; | 75 | this.ruleEngineSettings = ruleEngineSettings; |
67 | this.partitionService = partitionService; | 76 | this.partitionService = partitionService; |
68 | this.serviceInfoProvider = serviceInfoProvider; | 77 | this.serviceInfoProvider = serviceInfoProvider; |
69 | - this.admin = admin; | 78 | + |
79 | + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | ||
80 | + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | ||
81 | + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); | ||
82 | + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); | ||
83 | + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); | ||
70 | } | 84 | } |
71 | 85 | ||
72 | @Override | 86 | @Override |
73 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { | 87 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
74 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 88 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
75 | } | 89 | } |
76 | 90 | ||
77 | @Override | 91 | @Override |
78 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { | 92 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
79 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 93 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
80 | } | 94 | } |
81 | 95 | ||
82 | @Override | 96 | @Override |
83 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { | 97 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
84 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); | 98 | + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); |
85 | } | 99 | } |
86 | 100 | ||
87 | @Override | 101 | @Override |
88 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { | 102 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 103 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
90 | } | 104 | } |
91 | 105 | ||
92 | @Override | 106 | @Override |
93 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { | 107 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
94 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 108 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
95 | } | 109 | } |
96 | 110 | ||
97 | @Override | 111 | @Override |
98 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { | 112 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() { |
99 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(), | 113 | + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic(), |
100 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); | 114 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); |
101 | } | 115 | } |
102 | 116 | ||
103 | @Override | 117 | @Override |
104 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { | 118 | public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() { |
105 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, | 119 | + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, |
106 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), | 120 | partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
107 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | 121 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
108 | } | 122 | } |
109 | 123 | ||
110 | @Override | 124 | @Override |
111 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { | 125 | public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() { |
112 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), | 126 | + return new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic(), |
113 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); | 127 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); |
114 | } | 128 | } |
115 | 129 | ||
116 | @Override | 130 | @Override |
117 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { | 131 | public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() { |
118 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 132 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
119 | } | 133 | } |
120 | 134 | ||
121 | @Override | 135 | @Override |
122 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 136 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
123 | return null; | 137 | return null; |
124 | } | 138 | } |
139 | + | ||
140 | + @PreDestroy | ||
141 | + private void destroy() { | ||
142 | + if (coreAdmin != null) { | ||
143 | + coreAdmin.destroy(); | ||
144 | + } | ||
145 | + if (ruleEngineAdmin != null) { | ||
146 | + ruleEngineAdmin.destroy(); | ||
147 | + } | ||
148 | + if (jsExecutorAdmin != null) { | ||
149 | + jsExecutorAdmin.destroy(); | ||
150 | + } | ||
151 | + if (transportApiAdmin != null) { | ||
152 | + transportApiAdmin.destroy(); | ||
153 | + } | ||
154 | + if (notificationAdmin != null) { | ||
155 | + notificationAdmin.destroy(); | ||
156 | + } | ||
157 | + } | ||
125 | } | 158 | } |
@@ -32,13 +32,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | @@ -32,13 +32,17 @@ import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; | ||
32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
33 | import org.thingsboard.server.queue.discovery.PartitionService; | 33 | import org.thingsboard.server.queue.discovery.PartitionService; |
34 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 34 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
35 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; | ||
35 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; | 36 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; |
36 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | 37 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; |
38 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | ||
37 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 39 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
38 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | 40 | import org.thingsboard.server.queue.settings.TbQueueCoreSettings; |
39 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | 41 | import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; |
40 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | 42 | import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; |
41 | 43 | ||
44 | +import javax.annotation.PreDestroy; | ||
45 | + | ||
42 | @Component | 46 | @Component |
43 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") | 47 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'") |
44 | public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { | 48 | public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { |
@@ -48,55 +52,63 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | @@ -48,55 +52,63 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | ||
48 | private final TbServiceInfoProvider serviceInfoProvider; | 52 | private final TbServiceInfoProvider serviceInfoProvider; |
49 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 53 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
50 | private final TbRabbitMqSettings rabbitMqSettings; | 54 | private final TbRabbitMqSettings rabbitMqSettings; |
51 | - private final TbQueueAdmin admin; | 55 | + |
56 | + private final TbQueueAdmin coreAdmin; | ||
57 | + private final TbQueueAdmin ruleEngineAdmin; | ||
58 | + private final TbQueueAdmin jsExecutorAdmin; | ||
59 | + private final TbQueueAdmin notificationAdmin; | ||
52 | 60 | ||
53 | public RabbitMqTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, | 61 | public RabbitMqTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings, |
54 | TbQueueRuleEngineSettings ruleEngineSettings, | 62 | TbQueueRuleEngineSettings ruleEngineSettings, |
55 | TbServiceInfoProvider serviceInfoProvider, | 63 | TbServiceInfoProvider serviceInfoProvider, |
56 | TbRabbitMqSettings rabbitMqSettings, | 64 | TbRabbitMqSettings rabbitMqSettings, |
57 | - TbQueueAdmin admin) { | 65 | + TbRabbitMqQueueArguments queueArguments) { |
58 | this.partitionService = partitionService; | 66 | this.partitionService = partitionService; |
59 | this.coreSettings = coreSettings; | 67 | this.coreSettings = coreSettings; |
60 | this.serviceInfoProvider = serviceInfoProvider; | 68 | this.serviceInfoProvider = serviceInfoProvider; |
61 | this.ruleEngineSettings = ruleEngineSettings; | 69 | this.ruleEngineSettings = ruleEngineSettings; |
62 | this.rabbitMqSettings = rabbitMqSettings; | 70 | this.rabbitMqSettings = rabbitMqSettings; |
63 | - this.admin = admin; | 71 | + |
72 | + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | ||
73 | + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | ||
74 | + this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs()); | ||
75 | + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); | ||
64 | } | 76 | } |
65 | 77 | ||
66 | @Override | 78 | @Override |
67 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { | 79 | public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() { |
68 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 80 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
69 | } | 81 | } |
70 | 82 | ||
71 | @Override | 83 | @Override |
72 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { | 84 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
73 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 85 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
74 | } | 86 | } |
75 | 87 | ||
76 | @Override | 88 | @Override |
77 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { | 89 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() { |
78 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic()); | 90 | + return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic()); |
79 | } | 91 | } |
80 | 92 | ||
81 | @Override | 93 | @Override |
82 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { | 94 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
83 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 95 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
84 | } | 96 | } |
85 | 97 | ||
86 | @Override | 98 | @Override |
87 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { | 99 | public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() { |
88 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic()); | 100 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
89 | } | 101 | } |
90 | 102 | ||
91 | @Override | 103 | @Override |
92 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { | 104 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) { |
93 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(), | 105 | + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(), |
94 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); | 106 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); |
95 | } | 107 | } |
96 | 108 | ||
97 | @Override | 109 | @Override |
98 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { | 110 | public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() { |
99 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, | 111 | + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, |
100 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), | 112 | partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), |
101 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | 113 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); |
102 | } | 114 | } |
@@ -105,4 +117,20 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | @@ -105,4 +117,20 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor | ||
105 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { | 117 | public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() { |
106 | return null; | 118 | return null; |
107 | } | 119 | } |
120 | + | ||
121 | + @PreDestroy | ||
122 | + private void destroy() { | ||
123 | + if (coreAdmin != null) { | ||
124 | + coreAdmin.destroy(); | ||
125 | + } | ||
126 | + if (ruleEngineAdmin != null) { | ||
127 | + ruleEngineAdmin.destroy(); | ||
128 | + } | ||
129 | + if (jsExecutorAdmin != null) { | ||
130 | + jsExecutorAdmin.destroy(); | ||
131 | + } | ||
132 | + if (notificationAdmin != null) { | ||
133 | + notificationAdmin.destroy(); | ||
134 | + } | ||
135 | + } | ||
108 | } | 136 | } |
@@ -30,12 +30,17 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; | @@ -30,12 +30,17 @@ import org.thingsboard.server.queue.TbQueueRequestTemplate; | ||
30 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | 30 | import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; |
31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 31 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
33 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqAdmin; | ||
33 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; | 34 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate; |
34 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; | 35 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate; |
36 | +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments; | ||
35 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; | 37 | import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings; |
38 | +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
36 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 39 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
37 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; | 40 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
38 | 41 | ||
42 | +import javax.annotation.PreDestroy; | ||
43 | + | ||
39 | @Component | 44 | @Component |
40 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") | 45 | @ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") |
41 | @Slf4j | 46 | @Slf4j |
@@ -43,34 +48,45 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { | @@ -43,34 +48,45 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { | ||
43 | private final TbQueueTransportApiSettings transportApiSettings; | 48 | private final TbQueueTransportApiSettings transportApiSettings; |
44 | private final TbQueueTransportNotificationSettings transportNotificationSettings; | 49 | private final TbQueueTransportNotificationSettings transportNotificationSettings; |
45 | private final TbRabbitMqSettings rabbitMqSettings; | 50 | private final TbRabbitMqSettings rabbitMqSettings; |
46 | - private final TbQueueAdmin admin; | ||
47 | private final TbServiceInfoProvider serviceInfoProvider; | 51 | private final TbServiceInfoProvider serviceInfoProvider; |
52 | + private final TbQueueCoreSettings coreSettings; | ||
53 | + | ||
54 | + private final TbQueueAdmin coreAdmin; | ||
55 | + private final TbQueueAdmin ruleEngineAdmin; | ||
56 | + private final TbQueueAdmin transportApiAdmin; | ||
57 | + private final TbQueueAdmin notificationAdmin; | ||
48 | 58 | ||
49 | public RabbitMqTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, | 59 | public RabbitMqTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
50 | TbQueueTransportNotificationSettings transportNotificationSettings, | 60 | TbQueueTransportNotificationSettings transportNotificationSettings, |
51 | TbRabbitMqSettings rabbitMqSettings, | 61 | TbRabbitMqSettings rabbitMqSettings, |
52 | TbServiceInfoProvider serviceInfoProvider, | 62 | TbServiceInfoProvider serviceInfoProvider, |
53 | - TbQueueAdmin admin) { | 63 | + TbQueueCoreSettings coreSettings, |
64 | + TbRabbitMqQueueArguments queueArguments) { | ||
54 | this.transportApiSettings = transportApiSettings; | 65 | this.transportApiSettings = transportApiSettings; |
55 | this.transportNotificationSettings = transportNotificationSettings; | 66 | this.transportNotificationSettings = transportNotificationSettings; |
56 | this.rabbitMqSettings = rabbitMqSettings; | 67 | this.rabbitMqSettings = rabbitMqSettings; |
57 | - this.admin = admin; | ||
58 | this.serviceInfoProvider = serviceInfoProvider; | 68 | this.serviceInfoProvider = serviceInfoProvider; |
69 | + this.coreSettings = coreSettings; | ||
70 | + | ||
71 | + this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs()); | ||
72 | + this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs()); | ||
73 | + this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs()); | ||
74 | + this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs()); | ||
59 | } | 75 | } |
60 | 76 | ||
61 | @Override | 77 | @Override |
62 | public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() { | 78 | public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() { |
63 | TbRabbitMqProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate = | 79 | TbRabbitMqProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate = |
64 | - new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); | 80 | + new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); |
65 | 81 | ||
66 | TbRabbitMqConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate = | 82 | TbRabbitMqConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate = |
67 | - new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, | 83 | + new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings, |
68 | transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), | 84 | transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), |
69 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); | 85 | msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); |
70 | 86 | ||
71 | DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | 87 | DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder |
72 | <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); | 88 | <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); |
73 | - templateBuilder.queueAdmin(admin); | 89 | + templateBuilder.queueAdmin(transportApiAdmin); |
74 | templateBuilder.requestTemplate(producerTemplate); | 90 | templateBuilder.requestTemplate(producerTemplate); |
75 | templateBuilder.responseTemplate(consumerTemplate); | 91 | templateBuilder.responseTemplate(consumerTemplate); |
76 | templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); | 92 | templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); |
@@ -81,17 +97,33 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { | @@ -81,17 +97,33 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory { | ||
81 | 97 | ||
82 | @Override | 98 | @Override |
83 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { | 99 | public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() { |
84 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); | 100 | + return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); |
85 | } | 101 | } |
86 | 102 | ||
87 | @Override | 103 | @Override |
88 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { | 104 | public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() { |
89 | - return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic()); | 105 | + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic()); |
90 | } | 106 | } |
91 | 107 | ||
92 | @Override | 108 | @Override |
93 | public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() { | 109 | public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() { |
94 | - return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), | 110 | + return new TbRabbitMqConsumerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), |
95 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); | 111 | msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); |
96 | } | 112 | } |
113 | + | ||
114 | + @PreDestroy | ||
115 | + private void destroy() { | ||
116 | + if (coreAdmin != null) { | ||
117 | + coreAdmin.destroy(); | ||
118 | + } | ||
119 | + if (ruleEngineAdmin != null) { | ||
120 | + ruleEngineAdmin.destroy(); | ||
121 | + } | ||
122 | + if (transportApiAdmin != null) { | ||
123 | + transportApiAdmin.destroy(); | ||
124 | + } | ||
125 | + if (notificationAdmin != null) { | ||
126 | + notificationAdmin.destroy(); | ||
127 | + } | ||
128 | + } | ||
97 | } | 129 | } |
@@ -29,6 +29,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | @@ -29,6 +29,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ||
29 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; | 29 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusConsumerTemplate; |
30 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; | 30 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusProducerTemplate; |
31 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; | 31 | import org.thingsboard.server.queue.azure.servicebus.TbServiceBusSettings; |
32 | +import org.thingsboard.server.queue.settings.TbQueueCoreSettings; | ||
32 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; | 33 | import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings; |
33 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; | 34 | import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings; |
34 | 35 | ||
@@ -41,17 +42,20 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory | @@ -41,17 +42,20 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory | ||
41 | private final TbServiceBusSettings serviceBusSettings; | 42 | private final TbServiceBusSettings serviceBusSettings; |
42 | private final TbQueueAdmin admin; | 43 | private final TbQueueAdmin admin; |
43 | private final TbServiceInfoProvider serviceInfoProvider; | 44 | private final TbServiceInfoProvider serviceInfoProvider; |
45 | + private final TbQueueCoreSettings coreSettings; | ||
44 | 46 | ||
45 | public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, | 47 | public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings, |
46 | TbQueueTransportNotificationSettings transportNotificationSettings, | 48 | TbQueueTransportNotificationSettings transportNotificationSettings, |
47 | TbServiceBusSettings serviceBusSettings, | 49 | TbServiceBusSettings serviceBusSettings, |
48 | TbServiceInfoProvider serviceInfoProvider, | 50 | TbServiceInfoProvider serviceInfoProvider, |
51 | + TbQueueCoreSettings coreSettings, | ||
49 | TbQueueAdmin admin) { | 52 | TbQueueAdmin admin) { |
50 | this.transportApiSettings = transportApiSettings; | 53 | this.transportApiSettings = transportApiSettings; |
51 | this.transportNotificationSettings = transportNotificationSettings; | 54 | this.transportNotificationSettings = transportNotificationSettings; |
52 | this.serviceBusSettings = serviceBusSettings; | 55 | this.serviceBusSettings = serviceBusSettings; |
53 | this.admin = admin; | 56 | this.admin = admin; |
54 | this.serviceInfoProvider = serviceInfoProvider; | 57 | this.serviceInfoProvider = serviceInfoProvider; |
58 | + this.coreSettings = coreSettings; | ||
55 | } | 59 | } |
56 | 60 | ||
57 | @Override | 61 | @Override |
@@ -82,7 +86,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory | @@ -82,7 +86,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory | ||
82 | 86 | ||
83 | @Override | 87 | @Override |
84 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { | 88 | public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() { |
85 | - return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, transportApiSettings.getRequestsTopic()); | 89 | + return new TbServiceBusProducerTemplate<>(admin, serviceBusSettings, coreSettings.getTopic()); |
86 | } | 90 | } |
87 | 91 | ||
88 | @Override | 92 | @Override |
@@ -18,24 +18,23 @@ package org.thingsboard.server.queue.rabbitmq; | @@ -18,24 +18,23 @@ package org.thingsboard.server.queue.rabbitmq; | ||
18 | import com.rabbitmq.client.Channel; | 18 | import com.rabbitmq.client.Channel; |
19 | import com.rabbitmq.client.Connection; | 19 | import com.rabbitmq.client.Connection; |
20 | import lombok.extern.slf4j.Slf4j; | 20 | import lombok.extern.slf4j.Slf4j; |
21 | -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
22 | -import org.springframework.stereotype.Component; | ||
23 | import org.thingsboard.server.queue.TbQueueAdmin; | 21 | import org.thingsboard.server.queue.TbQueueAdmin; |
24 | 22 | ||
25 | import java.io.IOException; | 23 | import java.io.IOException; |
24 | +import java.util.Map; | ||
26 | import java.util.concurrent.TimeoutException; | 25 | import java.util.concurrent.TimeoutException; |
27 | 26 | ||
28 | @Slf4j | 27 | @Slf4j |
29 | -@Component | ||
30 | -@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") | ||
31 | public class TbRabbitMqAdmin implements TbQueueAdmin { | 28 | public class TbRabbitMqAdmin implements TbQueueAdmin { |
32 | 29 | ||
33 | private final TbRabbitMqSettings rabbitMqSettings; | 30 | private final TbRabbitMqSettings rabbitMqSettings; |
34 | private final Channel channel; | 31 | private final Channel channel; |
35 | private final Connection connection; | 32 | private final Connection connection; |
33 | + private final Map<String, Object> arguments; | ||
36 | 34 | ||
37 | - public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings) { | 35 | + public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) { |
38 | this.rabbitMqSettings = rabbitMqSettings; | 36 | this.rabbitMqSettings = rabbitMqSettings; |
37 | + this.arguments = arguments; | ||
39 | 38 | ||
40 | try { | 39 | try { |
41 | connection = rabbitMqSettings.getConnectionFactory().newConnection(); | 40 | connection = rabbitMqSettings.getConnectionFactory().newConnection(); |
@@ -55,7 +54,7 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { | @@ -55,7 +54,7 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { | ||
55 | @Override | 54 | @Override |
56 | public void createTopicIfNotExists(String topic) { | 55 | public void createTopicIfNotExists(String topic) { |
57 | try { | 56 | try { |
58 | - channel.queueDeclare(topic, false, false, false, null); | 57 | + channel.queueDeclare(topic, false, false, false, arguments); |
59 | } catch (IOException e) { | 58 | } catch (IOException e) { |
60 | log.error("Failed to bind queue: [{}]", topic, e); | 59 | log.error("Failed to bind queue: [{}]", topic, e); |
61 | } | 60 | } |
common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.rabbitmq; | ||
17 | + | ||
18 | +import lombok.Getter; | ||
19 | +import org.springframework.beans.factory.annotation.Value; | ||
20 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
21 | +import org.springframework.stereotype.Component; | ||
22 | + | ||
23 | +import javax.annotation.PostConstruct; | ||
24 | +import java.util.HashMap; | ||
25 | +import java.util.Map; | ||
26 | +import java.util.regex.Pattern; | ||
27 | + | ||
28 | +@Component | ||
29 | +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'") | ||
30 | +public class TbRabbitMqQueueArguments { | ||
31 | + @Value("${queue.rabbitmq.queue-properties.core}") | ||
32 | + private String coreProperties; | ||
33 | + @Value("${queue.rabbitmq.queue-properties.rule-engine}") | ||
34 | + private String ruleEngineProperties; | ||
35 | + @Value("${queue.rabbitmq.queue-properties.transport-api}") | ||
36 | + private String transportApiProperties; | ||
37 | + @Value("${queue.rabbitmq.queue-properties.notifications}") | ||
38 | + private String notificationsProperties; | ||
39 | + @Value("${queue.rabbitmq.queue-properties.js-executor}") | ||
40 | + private String jsExecutorProperties; | ||
41 | + | ||
42 | + @Getter | ||
43 | + private Map<String, Object> coreArgs; | ||
44 | + @Getter | ||
45 | + private Map<String, Object> ruleEngineArgs; | ||
46 | + @Getter | ||
47 | + private Map<String, Object> transportApiArgs; | ||
48 | + @Getter | ||
49 | + private Map<String, Object> notificationsArgs; | ||
50 | + @Getter | ||
51 | + private Map<String, Object> jsExecutorArgs; | ||
52 | + | ||
53 | + @PostConstruct | ||
54 | + private void init() { | ||
55 | + coreArgs = getArgs(coreProperties); | ||
56 | + ruleEngineArgs = getArgs(ruleEngineProperties); | ||
57 | + transportApiArgs = getArgs(transportApiProperties); | ||
58 | + notificationsArgs = getArgs(notificationsProperties); | ||
59 | + jsExecutorArgs = getArgs(jsExecutorProperties); | ||
60 | + } | ||
61 | + | ||
62 | + private Map<String, Object> getArgs(String properties) { | ||
63 | + Map<String, Object> configs = new HashMap<>(); | ||
64 | + for (String property : properties.split(";")) { | ||
65 | + int delimiterPosition = property.indexOf(":"); | ||
66 | + String key = property.substring(0, delimiterPosition); | ||
67 | + String strValue = property.substring(delimiterPosition + 1); | ||
68 | + configs.put(key, getObjectValue(strValue)); | ||
69 | + } | ||
70 | + return configs; | ||
71 | + } | ||
72 | + | ||
73 | + private Object getObjectValue(String str) { | ||
74 | + if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) { | ||
75 | + return Boolean.valueOf(str); | ||
76 | + } else if (isNumeric(str)) { | ||
77 | + return getNumericValue(str); | ||
78 | + } | ||
79 | + return str; | ||
80 | + } | ||
81 | + | ||
82 | + private Object getNumericValue(String str) { | ||
83 | + if (str.contains(".")) { | ||
84 | + return Double.valueOf(str); | ||
85 | + } else { | ||
86 | + return Long.valueOf(str); | ||
87 | + } | ||
88 | + } | ||
89 | + | ||
90 | + private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?"); | ||
91 | + | ||
92 | + public boolean isNumeric(String strNum) { | ||
93 | + if (strNum == null) { | ||
94 | + return false; | ||
95 | + } | ||
96 | + return PATTERN.matcher(strNum).matches(); | ||
97 | + } | ||
98 | +} |
@@ -95,6 +95,12 @@ queue: | @@ -95,6 +95,12 @@ queue: | ||
95 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" | 95 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" |
96 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" | 96 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" |
97 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" | 97 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" |
98 | + queue-properties: | ||
99 | + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
100 | + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
101 | + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
102 | + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
103 | + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
98 | partitions: | 104 | partitions: |
99 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 105 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
100 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 106 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |
@@ -96,6 +96,12 @@ queue: | @@ -96,6 +96,12 @@ queue: | ||
96 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" | 96 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" |
97 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" | 97 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" |
98 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" | 98 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" |
99 | + queue-properties: | ||
100 | + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
101 | + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
102 | + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
103 | + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
104 | + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
99 | partitions: | 105 | partitions: |
100 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 106 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
101 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 107 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |
@@ -126,6 +126,12 @@ queue: | @@ -126,6 +126,12 @@ queue: | ||
126 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" | 126 | automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}" |
127 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" | 127 | connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}" |
128 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" | 128 | handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}" |
129 | + queue-properties: | ||
130 | + rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
131 | + core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
132 | + transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
133 | + notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
134 | + js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" | ||
129 | partitions: | 135 | partitions: |
130 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 136 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
131 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 137 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |