Commit 7b3d47526792fd5a26fdaa9233afcc69f110ed5b

Authored by Yevhen Bondarenko
Committed by GitHub
1 parent ca193239

[2.5] created rabbitmq queue (#2589)

* created main classes for RabbitMq queue

* created temp version rabbitmq

* Merge branch 'develop/2.5' of https://github.com/thingsboard/thingsboard into develop/2.5-rabbitmq

# Conflicts:
#	common/queue/pom.xml

* rabbit improvements
... ... @@ -119,6 +119,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
119 119 if (submitExecutor != null) {
120 120 submitExecutor.shutdownNow();
121 121 }
  122 +
  123 + ruleEngineSettings.getQueues().forEach(config -> consumerConfigurations.put(config.getName(), config));
122 124 }
123 125
124 126 @Override
... ...
... ... @@ -550,6 +550,16 @@ queue:
550 550 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
551 551 sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
552 552 max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
  553 + rabbitmq:
  554 + exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
  555 + host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"
  556 + port: "${TB_QUEUE_RABBIT_MQ_PORT:5672}"
  557 + virtual_host: "${TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST:/}"
  558 + username: "${TB_QUEUE_RABBIT_MQ_USERNAME:YOUR_USERNAME}"
  559 + password: "${TB_QUEUE_RABBIT_MQ_PASSWORD:YOUR_PASSWORD}"
  560 + automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}"
  561 + connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}"
  562 + handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}"
553 563 partitions:
554 564 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
555 565 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
... ...
... ... @@ -65,6 +65,10 @@
65 65 <artifactId>azure-servicebus</artifactId>
66 66 </dependency>
67 67 <dependency>
  68 + <groupId>com.rabbitmq</groupId>
  69 + <artifactId>amqp-client</artifactId>
  70 + </dependency>
  71 + <dependency>
68 72 <groupId>org.springframework</groupId>
69 73 <artifactId>spring-context-support</artifactId>
70 74 </dependency>
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.queue.common;
17 17
18 18 import lombok.Data;
19 19 import org.thingsboard.server.queue.TbQueueMsg;
20   -import org.thingsboard.server.queue.TbQueueMsgHeaders;
21 20
22 21 import java.util.UUID;
23 22
... ... @@ -25,13 +24,7 @@ import java.util.UUID;
25 24 public class DefaultTbQueueMsg implements TbQueueMsg {
26 25 private final UUID key;
27 26 private final byte[] data;
28   - private DefaultTbQueueMsgHeaders headers;
29   -
30   -
31   - public DefaultTbQueueMsg(UUID key, byte[] data) {
32   - this.key = key;
33   - this.data = data;
34   - }
  27 + private final DefaultTbQueueMsgHeaders headers;
35 28
36 29 public DefaultTbQueueMsg(TbQueueMsg msg) {
37 30 this.key = msg.getKey();
... ...
... ... @@ -20,7 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
20 20 import com.google.common.util.concurrent.SettableFuture;
21 21 import lombok.Builder;
22 22 import lombok.extern.slf4j.Slf4j;
23   -import org.apache.kafka.common.errors.InterruptException;
  23 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
24 24 import org.thingsboard.server.queue.TbQueueAdmin;
25 25 import org.thingsboard.server.queue.TbQueueCallback;
26 26 import org.thingsboard.server.queue.TbQueueConsumer;
... ... @@ -28,7 +28,6 @@ import org.thingsboard.server.queue.TbQueueMsg;
28 28 import org.thingsboard.server.queue.TbQueueMsgMetadata;
29 29 import org.thingsboard.server.queue.TbQueueProducer;
30 30 import org.thingsboard.server.queue.TbQueueRequestTemplate;
31   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
32 31
33 32 import java.util.List;
34 33 import java.util.UUID;
... ... @@ -128,10 +127,6 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
128 127 });
129 128 nextCleanupMs = tickTs + maxRequestTimeout;
130 129 }
131   - } catch (InterruptException ie) {
132   - if (!stopped) {
133   - log.warn("Fetching data from kafka was interrupted.", ie);
134   - }
135 130 } catch (Throwable e) {
136 131 log.warn("Failed to obtain responses from queue.", e);
137 132 try {
... ...
... ... @@ -17,7 +17,6 @@ package org.thingsboard.server.queue.common;
17 17
18 18 import lombok.Builder;
19 19 import lombok.extern.slf4j.Slf4j;
20   -import org.apache.kafka.common.errors.InterruptException;
21 20 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
22 21 import org.thingsboard.server.queue.TbQueueConsumer;
23 22 import org.thingsboard.server.queue.TbQueueHandler;
... ... @@ -133,10 +132,6 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
133 132 }
134 133 });
135 134 requestTemplate.commit();
136   - } catch (InterruptException ie) {
137   - if (!stopped) {
138   - log.warn("Fetching data from queue was interrupted.", ie);
139   - }
140 135 } catch (Throwable e) {
141 136 log.warn("Failed to obtain messages from queue.", e);
142 137 try {
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.queue.TbQueueAdmin;
  23 +import org.thingsboard.server.queue.TbQueueConsumer;
  24 +import org.thingsboard.server.queue.TbQueueProducer;
  25 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  26 +import org.thingsboard.server.queue.discovery.PartitionService;
  27 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  28 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
  29 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
  30 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
  31 +import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  32 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  33 +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
  34 +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
  35 +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
  36 +
  37 +@Component
  38 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='monolith'")
  39 +public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngineQueueFactory {
  40 +
  41 + private final PartitionService partitionService;
  42 + private final TbQueueCoreSettings coreSettings;
  43 + private final TbServiceInfoProvider serviceInfoProvider;
  44 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  45 + private final TbQueueTransportApiSettings transportApiSettings;
  46 + private final TbQueueTransportNotificationSettings transportNotificationSettings;
  47 + private final TbRabbitMqSettings rabbitMqSettings;
  48 + private final TbQueueAdmin admin;
  49 +
  50 + public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
  51 + TbQueueRuleEngineSettings ruleEngineSettings,
  52 + TbServiceInfoProvider serviceInfoProvider,
  53 + TbQueueTransportApiSettings transportApiSettings,
  54 + TbQueueTransportNotificationSettings transportNotificationSettings,
  55 + TbRabbitMqSettings rabbitMqSettings,
  56 + TbQueueAdmin admin) {
  57 + this.partitionService = partitionService;
  58 + this.coreSettings = coreSettings;
  59 + this.serviceInfoProvider = serviceInfoProvider;
  60 + this.ruleEngineSettings = ruleEngineSettings;
  61 + this.transportApiSettings = transportApiSettings;
  62 + this.transportNotificationSettings = transportNotificationSettings;
  63 + this.rabbitMqSettings = rabbitMqSettings;
  64 + this.admin = admin;
  65 + }
  66 +
  67 + @Override
  68 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() {
  69 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic());
  70 + }
  71 +
  72 + @Override
  73 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
  74 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic());
  75 + }
  76 +
  77 + @Override
  78 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
  79 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic());
  80 + }
  81 +
  82 + @Override
  83 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
  84 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  85 + }
  86 +
  87 + @Override
  88 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
  89 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  90 + }
  91 +
  92 + @Override
  93 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
  94 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(),
  95 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  96 + }
  97 +
  98 + @Override
  99 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
  100 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  101 + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  102 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  103 + }
  104 +
  105 + @Override
  106 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() {
  107 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(),
  108 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  109 + }
  110 +
  111 + @Override
  112 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
  113 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  114 + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  115 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  116 + }
  117 +
  118 + @Override
  119 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
  120 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(),
  121 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  122 + }
  123 +
  124 + @Override
  125 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() {
  126 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getResponsesTopic());
  127 + }
  128 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  27 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  28 +import org.thingsboard.server.queue.TbQueueAdmin;
  29 +import org.thingsboard.server.queue.TbQueueConsumer;
  30 +import org.thingsboard.server.queue.TbQueueProducer;
  31 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  32 +import org.thingsboard.server.queue.discovery.PartitionService;
  33 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  34 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
  35 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
  36 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
  37 +import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  38 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  39 +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
  40 +
  41 +@Component
  42 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-core'")
  43 +public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
  44 +
  45 + private final TbRabbitMqSettings rabbitMqSettings;
  46 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  47 + private final TbQueueCoreSettings coreSettings;
  48 + private final TbQueueTransportApiSettings transportApiSettings;
  49 + private final PartitionService partitionService;
  50 + private final TbServiceInfoProvider serviceInfoProvider;
  51 + private final TbQueueAdmin admin;
  52 +
  53 + public RabbitMqTbCoreQueueFactory(TbRabbitMqSettings rabbitMqSettings,
  54 + TbQueueCoreSettings coreSettings,
  55 + TbQueueTransportApiSettings transportApiSettings,
  56 + TbQueueRuleEngineSettings ruleEngineSettings,
  57 + PartitionService partitionService,
  58 + TbServiceInfoProvider serviceInfoProvider,
  59 + TbQueueAdmin admin) {
  60 + this.rabbitMqSettings = rabbitMqSettings;
  61 + this.coreSettings = coreSettings;
  62 + this.transportApiSettings = transportApiSettings;
  63 + this.ruleEngineSettings = ruleEngineSettings;
  64 + this.partitionService = partitionService;
  65 + this.serviceInfoProvider = serviceInfoProvider;
  66 + this.admin = admin;
  67 + }
  68 +
  69 + @Override
  70 + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
  71 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  72 + }
  73 +
  74 + @Override
  75 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
  76 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  77 + }
  78 +
  79 + @Override
  80 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
  81 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic());
  82 + }
  83 +
  84 + @Override
  85 + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
  86 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  87 + }
  88 +
  89 + @Override
  90 + public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
  91 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  92 + }
  93 +
  94 + @Override
  95 + public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> createToCoreMsgConsumer() {
  96 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic(),
  97 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
  98 + }
  99 +
  100 + @Override
  101 + public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
  102 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  103 + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  104 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  105 + }
  106 +
  107 + @Override
  108 + public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportApiRequestConsumer() {
  109 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic(),
  110 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders()));
  111 + }
  112 +
  113 + @Override
  114 + public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
  115 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  116 + }
  117 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  19 +import org.springframework.stereotype.Component;
  20 +import org.thingsboard.server.common.msg.queue.ServiceType;
  21 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  26 +import org.thingsboard.server.queue.TbQueueAdmin;
  27 +import org.thingsboard.server.queue.TbQueueConsumer;
  28 +import org.thingsboard.server.queue.TbQueueProducer;
  29 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  30 +import org.thingsboard.server.queue.discovery.PartitionService;
  31 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  32 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
  33 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
  34 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
  35 +import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
  36 +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
  37 +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
  38 +
  39 +@Component
  40 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && '${service.type:null}'=='tb-rule-engine'")
  41 +public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
  42 +
  43 + private final PartitionService partitionService;
  44 + private final TbQueueCoreSettings coreSettings;
  45 + private final TbServiceInfoProvider serviceInfoProvider;
  46 + private final TbQueueRuleEngineSettings ruleEngineSettings;
  47 + private final TbRabbitMqSettings rabbitMqSettings;
  48 + private final TbQueueAdmin admin;
  49 +
  50 + public RabbitMqTbRuleEngineQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
  51 + TbQueueRuleEngineSettings ruleEngineSettings,
  52 + TbServiceInfoProvider serviceInfoProvider,
  53 + TbRabbitMqSettings rabbitMqSettings,
  54 + TbQueueAdmin admin) {
  55 + this.partitionService = partitionService;
  56 + this.coreSettings = coreSettings;
  57 + this.serviceInfoProvider = serviceInfoProvider;
  58 + this.ruleEngineSettings = ruleEngineSettings;
  59 + this.rabbitMqSettings = rabbitMqSettings;
  60 + this.admin = admin;
  61 + }
  62 +
  63 + @Override
  64 + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
  65 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  66 + }
  67 +
  68 + @Override
  69 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
  70 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  71 + }
  72 +
  73 + @Override
  74 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
  75 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic());
  76 + }
  77 +
  78 + @Override
  79 + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
  80 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  81 + }
  82 +
  83 + @Override
  84 + public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
  85 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, coreSettings.getTopic());
  86 + }
  87 +
  88 + @Override
  89 + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
  90 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, ruleEngineSettings.getTopic(),
  91 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
  92 + }
  93 +
  94 + @Override
  95 + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
  96 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  97 + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(),
  98 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
  99 + }
  100 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.provider;
  17 +
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  26 +import org.thingsboard.server.queue.TbQueueAdmin;
  27 +import org.thingsboard.server.queue.TbQueueConsumer;
  28 +import org.thingsboard.server.queue.TbQueueProducer;
  29 +import org.thingsboard.server.queue.TbQueueRequestTemplate;
  30 +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
  31 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  32 +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
  33 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqConsumerTemplate;
  34 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
  35 +import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
  36 +import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
  37 +import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
  38 +
  39 +@Component
  40 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')")
  41 +@Slf4j
  42 +public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
  43 + private final TbQueueTransportApiSettings transportApiSettings;
  44 + private final TbQueueTransportNotificationSettings transportNotificationSettings;
  45 + private final TbRabbitMqSettings rabbitMqSettings;
  46 + private final TbQueueAdmin admin;
  47 + private final TbServiceInfoProvider serviceInfoProvider;
  48 +
  49 + public RabbitMqTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
  50 + TbQueueTransportNotificationSettings transportNotificationSettings,
  51 + TbRabbitMqSettings rabbitMqSettings,
  52 + TbServiceInfoProvider serviceInfoProvider,
  53 + TbQueueAdmin admin) {
  54 + this.transportApiSettings = transportApiSettings;
  55 + this.transportNotificationSettings = transportNotificationSettings;
  56 + this.rabbitMqSettings = rabbitMqSettings;
  57 + this.admin = admin;
  58 + this.serviceInfoProvider = serviceInfoProvider;
  59 + }
  60 +
  61 + @Override
  62 + public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
  63 + TbRabbitMqProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
  64 + new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
  65 +
  66 + TbRabbitMqConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
  67 + new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings,
  68 + transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
  69 + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
  70 +
  71 + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
  72 + <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();
  73 + templateBuilder.queueAdmin(admin);
  74 + templateBuilder.requestTemplate(producerTemplate);
  75 + templateBuilder.responseTemplate(consumerTemplate);
  76 + templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
  77 + templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
  78 + templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
  79 + return templateBuilder.build();
  80 + }
  81 +
  82 + @Override
  83 + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
  84 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
  85 + }
  86 +
  87 + @Override
  88 + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
  89 + return new TbRabbitMqProducerTemplate<>(admin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
  90 + }
  91 +
  92 + @Override
  93 + public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsConsumer() {
  94 + return new TbRabbitMqConsumerTemplate<>(admin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(),
  95 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders()));
  96 + }
  97 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.rabbitmq;
  17 +import com.rabbitmq.client.Channel;
  18 +import com.rabbitmq.client.Connection;
  19 +
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  22 +import org.springframework.stereotype.Component;
  23 +import org.thingsboard.server.queue.TbQueueAdmin;
  24 +
  25 +import javax.annotation.PreDestroy;
  26 +import java.io.IOException;
  27 +import java.util.concurrent.TimeoutException;
  28 +
  29 +@Slf4j
  30 +@Component
  31 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
  32 +public class TbRabbitMqAdmin implements TbQueueAdmin {
  33 +
  34 + private final TbRabbitMqSettings rabbitMqSettings;
  35 + private final Channel channel;
  36 + private final Connection connection;
  37 +
  38 + public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings) {
  39 + this.rabbitMqSettings = rabbitMqSettings;
  40 +
  41 + try {
  42 + connection = rabbitMqSettings.getConnectionFactory().newConnection();
  43 + } catch (IOException | TimeoutException e) {
  44 + log.error("Failed to create connection.", e);
  45 + throw new RuntimeException("Failed to create connection.", e);
  46 + }
  47 +
  48 + try {
  49 + channel = connection.createChannel();
  50 + } catch (IOException e) {
  51 + log.error("Failed to create chanel.", e);
  52 + throw new RuntimeException("Failed to create chanel.", e);
  53 + }
  54 + }
  55 +
  56 + @Override
  57 + public void createTopicIfNotExists(String topic) {
  58 + try {
  59 + channel.queueDeclare(topic, false, false, false, null);
  60 + } catch (IOException e) {
  61 + log.error("Failed to bind queue: [{}]", topic, e);
  62 + }
  63 + }
  64 +
  65 + @PreDestroy
  66 + private void destroy() {
  67 + if (channel != null) {
  68 + try {
  69 + channel.close();
  70 + } catch (IOException | TimeoutException e) {
  71 + log.error("Failed to close Chanel.", e);
  72 + }
  73 + }
  74 + if (connection != null) {
  75 + try {
  76 + connection.close();
  77 + } catch (IOException e) {
  78 + log.error("Failed to close Connection.", e);
  79 + }
  80 + }
  81 + }
  82 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.rabbitmq;
  17 +
  18 +import com.google.gson.Gson;
  19 +import com.google.protobuf.InvalidProtocolBufferException;
  20 +import com.rabbitmq.client.Channel;
  21 +import com.rabbitmq.client.Connection;
  22 +import com.rabbitmq.client.GetResponse;
  23 +import lombok.extern.slf4j.Slf4j;
  24 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  25 +import org.thingsboard.server.queue.TbQueueAdmin;
  26 +import org.thingsboard.server.queue.TbQueueConsumer;
  27 +import org.thingsboard.server.queue.TbQueueMsg;
  28 +import org.thingsboard.server.queue.TbQueueMsgDecoder;
  29 +import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
  30 +
  31 +import java.io.IOException;
  32 +import java.util.Collections;
  33 +import java.util.List;
  34 +import java.util.Objects;
  35 +import java.util.Set;
  36 +import java.util.concurrent.TimeoutException;
  37 +import java.util.stream.Collectors;
  38 +
  39 +@Slf4j
  40 +public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  41 +
  42 + private final Gson gson = new Gson();
  43 + private final TbQueueAdmin admin;
  44 + private final String topic;
  45 + private final TbQueueMsgDecoder<T> decoder;
  46 + private final TbRabbitMqSettings rabbitMqSettings;
  47 + private final Channel channel;
  48 + private final Connection connection;
  49 +
  50 + private volatile Set<TopicPartitionInfo> partitions;
  51 + private volatile boolean subscribed;
  52 + private volatile Set<String> queues;
  53 + private volatile boolean stopped;
  54 +
  55 + public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  56 + this.admin = admin;
  57 + this.decoder = decoder;
  58 + this.topic = topic;
  59 + this.rabbitMqSettings = rabbitMqSettings;
  60 + try {
  61 + connection = rabbitMqSettings.getConnectionFactory().newConnection();
  62 + } catch (IOException | TimeoutException e) {
  63 + log.error("Failed to create connection.", e);
  64 + throw new RuntimeException("Failed to create connection.", e);
  65 + }
  66 +
  67 + try {
  68 + channel = connection.createChannel();
  69 + } catch (IOException e) {
  70 + log.error("Failed to create chanel.", e);
  71 + throw new RuntimeException("Failed to create chanel.", e);
  72 + }
  73 + stopped = false;
  74 + }
  75 +
  76 + @Override
  77 + public String getTopic() {
  78 + return topic;
  79 + }
  80 +
  81 + @Override
  82 + public void subscribe() {
  83 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  84 + subscribed = false;
  85 + }
  86 +
  87 + @Override
  88 + public void subscribe(Set<TopicPartitionInfo> partitions) {
  89 + this.partitions = partitions;
  90 + subscribed = false;
  91 + }
  92 +
  93 + @Override
  94 + public void unsubscribe() {
  95 + stopped = true;
  96 + if (channel != null) {
  97 + try {
  98 + channel.close();
  99 + } catch (IOException | TimeoutException e) {
  100 + log.error("Failed to close the channel.");
  101 + }
  102 + }
  103 + if (connection != null) {
  104 + try {
  105 + connection.close();
  106 + } catch (IOException e) {
  107 + log.error("Failed to close the connection.");
  108 + }
  109 + }
  110 + }
  111 +
  112 + @Override
  113 + public List<T> poll(long durationInMillis) {
  114 + if (!subscribed && partitions == null) {
  115 + try {
  116 + Thread.sleep(durationInMillis);
  117 + } catch (InterruptedException e) {
  118 + log.debug("Failed to await subscription", e);
  119 + }
  120 + } else {
  121 + if (!subscribed) {
  122 + queues = partitions.stream()
  123 + .map(TopicPartitionInfo::getFullTopicName)
  124 + .collect(Collectors.toSet());
  125 +
  126 + queues.forEach(admin::createTopicIfNotExists);
  127 + subscribed = true;
  128 + }
  129 +
  130 + List<T> result = queues.stream()
  131 + .map(queue -> {
  132 + try {
  133 + return channel.basicGet(queue, false);
  134 + } catch (IOException e) {
  135 + log.error("Failed to get messages from queue: [{}]", queue);
  136 + throw new RuntimeException("Failed to get messages from queue.", e);
  137 + }
  138 + }).filter(Objects::nonNull).map(message -> {
  139 + try {
  140 + return decode(message);
  141 + } catch (InvalidProtocolBufferException e) {
  142 + log.error("Failed to decode message: [{}].", message);
  143 + throw new RuntimeException("Failed to decode message.", e);
  144 + }
  145 + }).collect(Collectors.toList());
  146 + if (result.size() > 0) {
  147 + return result;
  148 + }
  149 + }
  150 + try {
  151 + Thread.sleep(durationInMillis);
  152 + } catch (InterruptedException e) {
  153 + if (!stopped) {
  154 + log.error("Failed to wait.", e);
  155 + }
  156 + }
  157 + return Collections.emptyList();
  158 + }
  159 +
  160 + @Override
  161 + public void commit() {
  162 + try {
  163 + channel.basicAck(0, true);
  164 + } catch (IOException e) {
  165 + log.error("Failed to ack messages.", e);
  166 + }
  167 + }
  168 +
  169 + public T decode(GetResponse message) throws InvalidProtocolBufferException {
  170 + DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
  171 + return decoder.decode(msg);
  172 + }
  173 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.rabbitmq;
  17 +
  18 +import com.google.common.util.concurrent.ListeningExecutorService;
  19 +import com.google.common.util.concurrent.MoreExecutors;
  20 +import com.google.gson.Gson;
  21 +import com.rabbitmq.client.AMQP;
  22 +import com.rabbitmq.client.Channel;
  23 +import com.rabbitmq.client.Connection;
  24 +import lombok.extern.slf4j.Slf4j;
  25 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  26 +import org.thingsboard.server.queue.TbQueueAdmin;
  27 +import org.thingsboard.server.queue.TbQueueCallback;
  28 +import org.thingsboard.server.queue.TbQueueMsg;
  29 +import org.thingsboard.server.queue.TbQueueProducer;
  30 +import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
  31 +
  32 +import java.io.IOException;
  33 +import java.util.concurrent.Executors;
  34 +import java.util.concurrent.TimeoutException;
  35 +
  36 +@Slf4j
  37 +public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> {
  38 + private final String defaultTopic;
  39 + private final Gson gson = new Gson();
  40 + private final TbQueueAdmin admin;
  41 + private final TbRabbitMqSettings rabbitMqSettings;
  42 + private ListeningExecutorService producerExecutor;
  43 + private final Channel channel;
  44 + private final Connection connection;
  45 +
  46 + public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
  47 + this.admin = admin;
  48 + this.defaultTopic = defaultTopic;
  49 + this.rabbitMqSettings = rabbitMqSettings;
  50 + producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
  51 + try {
  52 + connection = rabbitMqSettings.getConnectionFactory().newConnection();
  53 + } catch (IOException | TimeoutException e) {
  54 + log.error("Failed to create connection.", e);
  55 + throw new RuntimeException("Failed to create connection.", e);
  56 + }
  57 +
  58 + try {
  59 + channel = connection.createChannel();
  60 + } catch (IOException e) {
  61 + log.error("Failed to create chanel.", e);
  62 + throw new RuntimeException("Failed to create chanel.", e);
  63 + }
  64 + }
  65 +
  66 + @Override
  67 + public void init() {
  68 +
  69 + }
  70 +
  71 + @Override
  72 + public String getDefaultTopic() {
  73 + return defaultTopic;
  74 + }
  75 +
  76 + @Override
  77 + public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  78 + AMQP.BasicProperties properties = new AMQP.BasicProperties();
  79 + try {
  80 + channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
  81 + if (callback != null) {
  82 + callback.onSuccess(null);
  83 + }
  84 + } catch (IOException e) {
  85 + log.error("Failed publish message: [{}].", msg, e);
  86 + if (callback != null) {
  87 + callback.onFailure(e);
  88 + }
  89 + }
  90 + }
  91 +
  92 + @Override
  93 + public void stop() {
  94 + if (producerExecutor != null) {
  95 + producerExecutor.shutdownNow();
  96 + }
  97 + if (channel != null) {
  98 + try {
  99 + channel.close();
  100 + } catch (IOException | TimeoutException e) {
  101 + log.error("Failed to close the channel.");
  102 + }
  103 + }
  104 + if (connection != null) {
  105 + try {
  106 + connection.close();
  107 + } catch (IOException e) {
  108 + log.error("Failed to close the connection.");
  109 + }
  110 + }
  111 + }
  112 +
  113 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.rabbitmq;
  17 +
  18 +import com.rabbitmq.client.ConnectionFactory;
  19 +import lombok.Data;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.beans.factory.annotation.Value;
  22 +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  23 +import org.springframework.stereotype.Component;
  24 +
  25 +import javax.annotation.PostConstruct;
  26 +
  27 +@Slf4j
  28 +@ConditionalOnExpression("'${queue.type:null}'=='rabbitmq'")
  29 +@Component
  30 +@Data
  31 +public class TbRabbitMqSettings {
  32 + @Value("${queue.rabbitmq.exchange_name:}")
  33 + private String exchangeName;
  34 + @Value("${queue.rabbitmq.host:}")
  35 + private String host;
  36 + @Value("${queue.rabbitmq.port:}")
  37 + private int port;
  38 + @Value("${queue.rabbitmq.virtual_host:}")
  39 + private String virtualHost;
  40 + @Value("${queue.rabbitmq.username:}")
  41 + private String username;
  42 + @Value("${queue.rabbitmq.password:}")
  43 + private String password;
  44 + @Value("${queue.rabbitmq.automatic_recovery_enabled:}")
  45 + private boolean automaticRecoveryEnabled;
  46 + @Value("${queue.rabbitmq.connection_timeout:}")
  47 + private int connectionTimeout;
  48 + @Value("${queue.rabbitmq.handshake_timeout:}")
  49 + private int handshakeTimeout;
  50 +
  51 + private ConnectionFactory connectionFactory;
  52 +
  53 + @PostConstruct
  54 + private void init() {
  55 + connectionFactory = new ConnectionFactory();
  56 + connectionFactory.setHost(host);
  57 + connectionFactory.setPort(port);
  58 + connectionFactory.setVirtualHost(virtualHost);
  59 + connectionFactory.setUsername(username);
  60 + connectionFactory.setPassword(password);
  61 + connectionFactory.setAutomaticRecoveryEnabled(automaticRecoveryEnabled);
  62 + connectionFactory.setConnectionTimeout(connectionTimeout);
  63 + connectionFactory.setHandshakeTimeout(handshakeTimeout);
  64 + }
  65 +}
... ...
... ... @@ -78,7 +78,7 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
78 78 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
79 79 SendMessageRequest sendMsgRequest = new SendMessageRequest();
80 80 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
81   - sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData())));
  81 + sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg)));
82 82
83 83 sendMsgRequest.withMessageGroupId(msg.getKey().toString());
84 84 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
... ...
... ... @@ -88,6 +88,16 @@ queue:
88 88 sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
89 89 sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
90 90 max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
  91 + rabbitmq:
  92 + exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
  93 + host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"
  94 + port: "${TB_QUEUE_RABBIT_MQ_PORT:5672}"
  95 + virtual_host: "${TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST:/}"
  96 + username: "${TB_QUEUE_RABBIT_MQ_USERNAME:YOUR_USERNAME}"
  97 + password: "${TB_QUEUE_RABBIT_MQ_PASSWORD:YOUR_PASSWORD}"
  98 + automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}"
  99 + connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}"
  100 + handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}"
91 101 partitions:
92 102 hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
93 103 virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
... ...