Commit a6e090ef868fab068c21a65dc76a11c8c240e780
Committed by
GitHub
1 parent
2553cf6b
Develop/2.5 pubsub (#2566)
* created Aws Sqs Queue * improvement AwsSqs providers * created pubsub queue * revert package-lock.json * Aws sqs improvements * Aws sqs improvements * Aws sqs improvements * Aws sqs improvements * Created pubsub queue * aws improvements * aws improvements * aws improvements * added visibility timeout to aws queue * pub sub improvements * pub sub improvements * aws sqs improvements * pub sub improvements * added comment to transport.yml about ack deadline
Showing
19 changed files
with
1078 additions
and
32 deletions
@@ -517,7 +517,7 @@ swagger: | @@ -517,7 +517,7 @@ swagger: | ||
517 | version: "${SWAGGER_VERSION:2.0}" | 517 | version: "${SWAGGER_VERSION:2.0}" |
518 | 518 | ||
519 | queue: | 519 | queue: |
520 | - type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs | 520 | + type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub |
521 | kafka: | 521 | kafka: |
522 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" | 522 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
523 | acks: "${TB_KAFKA_ACKS:all}" | 523 | acks: "${TB_KAFKA_ACKS:all}" |
@@ -530,7 +530,13 @@ queue: | @@ -530,7 +530,13 @@ queue: | ||
530 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 530 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
531 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | 531 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
532 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" | 532 | threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" |
533 | - visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #in seconds | 533 | + visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again |
534 | + pubsub: | ||
535 | + project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" | ||
536 | + service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ||
537 | + ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again | ||
538 | + max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes | ||
539 | + max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" | ||
534 | partitions: | 540 | partitions: |
535 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 541 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
536 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 542 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |
@@ -56,7 +56,10 @@ | @@ -56,7 +56,10 @@ | ||
56 | <groupId>com.amazonaws</groupId> | 56 | <groupId>com.amazonaws</groupId> |
57 | <artifactId>aws-java-sdk-sqs</artifactId> | 57 | <artifactId>aws-java-sdk-sqs</artifactId> |
58 | </dependency> | 58 | </dependency> |
59 | - | 59 | + <dependency> |
60 | + <groupId>com.google.cloud</groupId> | ||
61 | + <artifactId>google-cloud-pubsub</artifactId> | ||
62 | + </dependency> | ||
60 | <dependency> | 63 | <dependency> |
61 | <groupId>org.springframework</groupId> | 64 | <groupId>org.springframework</groupId> |
62 | <artifactId>spring-context-support</artifactId> | 65 | <artifactId>spring-context-support</artifactId> |
common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueMsg.java
renamed from
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsMsg.java
@@ -13,7 +13,7 @@ | @@ -13,7 +13,7 @@ | ||
13 | * See the License for the specific language governing permissions and | 13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. | 14 | * limitations under the License. |
15 | */ | 15 | */ |
16 | -package org.thingsboard.server.queue.sqs; | 16 | +package org.thingsboard.server.queue.common; |
17 | 17 | ||
18 | import com.google.gson.annotations.Expose; | 18 | import com.google.gson.annotations.Expose; |
19 | import lombok.Data; | 19 | import lombok.Data; |
@@ -23,16 +23,15 @@ import org.thingsboard.server.queue.TbQueueMsgHeaders; | @@ -23,16 +23,15 @@ import org.thingsboard.server.queue.TbQueueMsgHeaders; | ||
23 | import java.util.UUID; | 23 | import java.util.UUID; |
24 | 24 | ||
25 | @Data | 25 | @Data |
26 | -public class TbAwsSqsMsg implements TbQueueMsg { | 26 | +public class DefaultTbQueueMsg implements TbQueueMsg { |
27 | private final UUID key; | 27 | private final UUID key; |
28 | private final byte[] data; | 28 | private final byte[] data; |
29 | 29 | ||
30 | - public TbAwsSqsMsg(UUID key, byte[] data) { | 30 | + public DefaultTbQueueMsg(UUID key, byte[] data) { |
31 | this.key = key; | 31 | this.key = key; |
32 | this.data = data; | 32 | this.data = data; |
33 | } | 33 | } |
34 | 34 | ||
35 | @Expose(serialize = false, deserialize = false) | 35 | @Expose(serialize = false, deserialize = false) |
36 | private TbQueueMsgHeaders headers; | 36 | private TbQueueMsgHeaders headers; |
37 | - | ||
38 | } | 37 | } |
@@ -96,7 +96,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response | @@ -96,7 +96,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response | ||
96 | continue; | 96 | continue; |
97 | } | 97 | } |
98 | responses.forEach(response -> { | 98 | responses.forEach(response -> { |
99 | - log.trace("Received response to Kafka Template request: {}", response); | 99 | + log.trace("Received response to Queue Template request: {}", response); |
100 | byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER); | 100 | byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER); |
101 | UUID requestId; | 101 | UUID requestId; |
102 | if (requestIdHeader == null) { | 102 | if (requestIdHeader == null) { |
@@ -30,7 +30,6 @@ import org.thingsboard.server.queue.TbQueueCoreSettings; | @@ -30,7 +30,6 @@ import org.thingsboard.server.queue.TbQueueCoreSettings; | ||
30 | import org.thingsboard.server.queue.TbQueueProducer; | 30 | import org.thingsboard.server.queue.TbQueueProducer; |
31 | import org.thingsboard.server.queue.TbQueueRuleEngineSettings; | 31 | import org.thingsboard.server.queue.TbQueueRuleEngineSettings; |
32 | import org.thingsboard.server.queue.TbQueueTransportApiSettings; | 32 | import org.thingsboard.server.queue.TbQueueTransportApiSettings; |
33 | -import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; | ||
34 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 33 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
35 | import org.thingsboard.server.queue.discovery.PartitionService; | 34 | import org.thingsboard.server.queue.discovery.PartitionService; |
36 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 35 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
@@ -48,21 +47,18 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider { | @@ -48,21 +47,18 @@ public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider { | ||
48 | private final TbQueueCoreSettings coreSettings; | 47 | private final TbQueueCoreSettings coreSettings; |
49 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 48 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
50 | private final TbQueueTransportApiSettings transportApiSettings; | 49 | private final TbQueueTransportApiSettings transportApiSettings; |
51 | - private final TbQueueTransportNotificationSettings transportNotificationSettings; | ||
52 | 50 | ||
53 | public KafkaTbCoreQueueProvider(PartitionService partitionService, TbKafkaSettings kafkaSettings, | 51 | public KafkaTbCoreQueueProvider(PartitionService partitionService, TbKafkaSettings kafkaSettings, |
54 | TbServiceInfoProvider serviceInfoProvider, | 52 | TbServiceInfoProvider serviceInfoProvider, |
55 | TbQueueCoreSettings coreSettings, | 53 | TbQueueCoreSettings coreSettings, |
56 | TbQueueRuleEngineSettings ruleEngineSettings, | 54 | TbQueueRuleEngineSettings ruleEngineSettings, |
57 | - TbQueueTransportApiSettings transportApiSettings, | ||
58 | - TbQueueTransportNotificationSettings transportNotificationSettings) { | 55 | + TbQueueTransportApiSettings transportApiSettings) { |
59 | this.partitionService = partitionService; | 56 | this.partitionService = partitionService; |
60 | this.kafkaSettings = kafkaSettings; | 57 | this.kafkaSettings = kafkaSettings; |
61 | this.serviceInfoProvider = serviceInfoProvider; | 58 | this.serviceInfoProvider = serviceInfoProvider; |
62 | this.coreSettings = coreSettings; | 59 | this.coreSettings = coreSettings; |
63 | this.ruleEngineSettings = ruleEngineSettings; | 60 | this.ruleEngineSettings = ruleEngineSettings; |
64 | this.transportApiSettings = transportApiSettings; | 61 | this.transportApiSettings = transportApiSettings; |
65 | - this.transportNotificationSettings = transportNotificationSettings; | ||
66 | } | 62 | } |
67 | 63 | ||
68 | @Override | 64 | @Override |
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueProvider.java
@@ -27,8 +27,6 @@ import org.thingsboard.server.queue.TbQueueConsumer; | @@ -27,8 +27,6 @@ import org.thingsboard.server.queue.TbQueueConsumer; | ||
27 | import org.thingsboard.server.queue.TbQueueCoreSettings; | 27 | import org.thingsboard.server.queue.TbQueueCoreSettings; |
28 | import org.thingsboard.server.queue.TbQueueProducer; | 28 | import org.thingsboard.server.queue.TbQueueProducer; |
29 | import org.thingsboard.server.queue.TbQueueRuleEngineSettings; | 29 | import org.thingsboard.server.queue.TbQueueRuleEngineSettings; |
30 | -import org.thingsboard.server.queue.TbQueueTransportApiSettings; | ||
31 | -import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; | ||
32 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; | 30 | import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
33 | import org.thingsboard.server.queue.discovery.PartitionService; | 31 | import org.thingsboard.server.queue.discovery.PartitionService; |
34 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | 32 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
@@ -45,22 +43,16 @@ public class KafkaTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider | @@ -45,22 +43,16 @@ public class KafkaTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider | ||
45 | private final TbServiceInfoProvider serviceInfoProvider; | 43 | private final TbServiceInfoProvider serviceInfoProvider; |
46 | private final TbQueueCoreSettings coreSettings; | 44 | private final TbQueueCoreSettings coreSettings; |
47 | private final TbQueueRuleEngineSettings ruleEngineSettings; | 45 | private final TbQueueRuleEngineSettings ruleEngineSettings; |
48 | - private final TbQueueTransportApiSettings transportApiSettings; | ||
49 | - private final TbQueueTransportNotificationSettings transportNotificationSettings; | ||
50 | 46 | ||
51 | public KafkaTbRuleEngineQueueProvider(PartitionService partitionService, TbKafkaSettings kafkaSettings, | 47 | public KafkaTbRuleEngineQueueProvider(PartitionService partitionService, TbKafkaSettings kafkaSettings, |
52 | TbServiceInfoProvider serviceInfoProvider, | 48 | TbServiceInfoProvider serviceInfoProvider, |
53 | TbQueueCoreSettings coreSettings, | 49 | TbQueueCoreSettings coreSettings, |
54 | - TbQueueRuleEngineSettings ruleEngineSettings, | ||
55 | - TbQueueTransportApiSettings transportApiSettings, | ||
56 | - TbQueueTransportNotificationSettings transportNotificationSettings) { | 50 | + TbQueueRuleEngineSettings ruleEngineSettings) { |
57 | this.partitionService = partitionService; | 51 | this.partitionService = partitionService; |
58 | this.kafkaSettings = kafkaSettings; | 52 | this.kafkaSettings = kafkaSettings; |
59 | this.serviceInfoProvider = serviceInfoProvider; | 53 | this.serviceInfoProvider = serviceInfoProvider; |
60 | this.coreSettings = coreSettings; | 54 | this.coreSettings = coreSettings; |
61 | this.ruleEngineSettings = ruleEngineSettings; | 55 | this.ruleEngineSettings = ruleEngineSettings; |
62 | - this.transportApiSettings = transportApiSettings; | ||
63 | - this.transportNotificationSettings = transportNotificationSettings; | ||
64 | } | 56 | } |
65 | 57 | ||
66 | @Override | 58 | @Override |
common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueProvider.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.provider; | ||
17 | + | ||
18 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
19 | +import org.springframework.stereotype.Component; | ||
20 | +import org.thingsboard.server.common.msg.queue.ServiceType; | ||
21 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; | ||
22 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; | ||
23 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; | ||
24 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; | ||
25 | +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; | ||
26 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; | ||
27 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; | ||
28 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
29 | +import org.thingsboard.server.queue.TbQueueConsumer; | ||
30 | +import org.thingsboard.server.queue.TbQueueCoreSettings; | ||
31 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
32 | +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; | ||
33 | +import org.thingsboard.server.queue.TbQueueTransportApiSettings; | ||
34 | +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; | ||
35 | +import org.thingsboard.server.queue.common.TbProtoQueueMsg; | ||
36 | +import org.thingsboard.server.queue.discovery.PartitionService; | ||
37 | +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ||
38 | +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; | ||
39 | +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | ||
40 | +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
41 | +import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | ||
42 | + | ||
43 | +@Component | ||
44 | +@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='monolith'") | ||
45 | +public class PubSubMonolithQueueProvider implements TbCoreQueueProvider, TbRuleEngineQueueProvider { | ||
46 | + | ||
47 | + private final TbPubSubSettings pubSubSettings; | ||
48 | + private final TbQueueCoreSettings coreSettings; | ||
49 | + private final TbQueueRuleEngineSettings ruleEngineSettings; | ||
50 | + private final TbQueueTransportApiSettings transportApiSettings; | ||
51 | + private final TbQueueTransportNotificationSettings transportNotificationSettings; | ||
52 | + private final TbQueueAdmin admin; | ||
53 | + private final PartitionService partitionService; | ||
54 | + private final TbServiceInfoProvider serviceInfoProvider; | ||
55 | + | ||
56 | + private TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreProducer; | ||
57 | + | ||
58 | + public PubSubMonolithQueueProvider(TbPubSubSettings pubSubSettings, | ||
59 | + TbQueueCoreSettings coreSettings, | ||
60 | + TbQueueRuleEngineSettings ruleEngineSettings, | ||
61 | + TbQueueTransportApiSettings transportApiSettings, | ||
62 | + TbQueueTransportNotificationSettings transportNotificationSettings, | ||
63 | + PartitionService partitionService, | ||
64 | + TbServiceInfoProvider serviceInfoProvider) { | ||
65 | + this.pubSubSettings = pubSubSettings; | ||
66 | + this.coreSettings = coreSettings; | ||
67 | + this.ruleEngineSettings = ruleEngineSettings; | ||
68 | + this.transportApiSettings = transportApiSettings; | ||
69 | + this.transportNotificationSettings = transportNotificationSettings; | ||
70 | + this.admin = new TbPubSubAdmin(pubSubSettings); | ||
71 | + this.partitionService = partitionService; | ||
72 | + this.serviceInfoProvider = serviceInfoProvider; | ||
73 | + } | ||
74 | + | ||
75 | + @Override | ||
76 | + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() { | ||
77 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportNotificationSettings.getNotificationsTopic()); | ||
78 | + } | ||
79 | + | ||
80 | + @Override | ||
81 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() { | ||
82 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); | ||
83 | + | ||
84 | + } | ||
85 | + | ||
86 | + @Override | ||
87 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() { | ||
88 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); | ||
89 | + } | ||
90 | + | ||
91 | + @Override | ||
92 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() { | ||
93 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
94 | + } | ||
95 | + | ||
96 | + @Override | ||
97 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() { | ||
98 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
99 | + } | ||
100 | + | ||
101 | + @Override | ||
102 | + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> getToRuleEngineMsgConsumer() { | ||
103 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(), | ||
104 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
105 | + } | ||
106 | + | ||
107 | + @Override | ||
108 | + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getToRuleEngineNotificationsMsgConsumer() { | ||
109 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
110 | + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), | ||
111 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
112 | + } | ||
113 | + | ||
114 | + @Override | ||
115 | + public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> getToCoreMsgConsumer() { | ||
116 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(), | ||
117 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
118 | + } | ||
119 | + | ||
120 | + @Override | ||
121 | + public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> getToCoreNotificationsMsgConsumer() { | ||
122 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
123 | + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), | ||
124 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
125 | + } | ||
126 | + | ||
127 | + @Override | ||
128 | + public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> getTransportApiRequestConsumer() { | ||
129 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(), | ||
130 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
131 | + } | ||
132 | + | ||
133 | + @Override | ||
134 | + public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiResponseProducer() { | ||
135 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getResponsesTopic()); | ||
136 | + } | ||
137 | +} |
common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueProvider.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.provider; | ||
17 | + | ||
18 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
19 | +import org.springframework.stereotype.Component; | ||
20 | +import org.thingsboard.server.common.msg.queue.ServiceType; | ||
21 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; | ||
22 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; | ||
23 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; | ||
24 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; | ||
25 | +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; | ||
26 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; | ||
27 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; | ||
28 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
29 | +import org.thingsboard.server.queue.TbQueueConsumer; | ||
30 | +import org.thingsboard.server.queue.TbQueueCoreSettings; | ||
31 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
32 | +import org.thingsboard.server.queue.TbQueueTransportApiSettings; | ||
33 | +import org.thingsboard.server.queue.common.TbProtoQueueMsg; | ||
34 | +import org.thingsboard.server.queue.discovery.PartitionService; | ||
35 | +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ||
36 | +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | ||
37 | +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
38 | +import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | ||
39 | + | ||
40 | +@Component | ||
41 | +@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-core'") | ||
42 | +public class PubSubTbCoreQueueProvider implements TbCoreQueueProvider { | ||
43 | + | ||
44 | + private final TbPubSubSettings pubSubSettings; | ||
45 | + private final TbQueueCoreSettings coreSettings; | ||
46 | + private final TbQueueTransportApiSettings transportApiSettings; | ||
47 | + private final TbQueueAdmin admin; | ||
48 | + private final PartitionService partitionService; | ||
49 | + private final TbServiceInfoProvider serviceInfoProvider; | ||
50 | + | ||
51 | + public PubSubTbCoreQueueProvider(TbPubSubSettings pubSubSettings, | ||
52 | + TbQueueCoreSettings coreSettings, | ||
53 | + TbQueueTransportApiSettings transportApiSettings, | ||
54 | + TbQueueAdmin admin, | ||
55 | + PartitionService partitionService, | ||
56 | + TbServiceInfoProvider serviceInfoProvider) { | ||
57 | + this.pubSubSettings = pubSubSettings; | ||
58 | + this.coreSettings = coreSettings; | ||
59 | + this.transportApiSettings = transportApiSettings; | ||
60 | + this.admin = admin; | ||
61 | + this.partitionService = partitionService; | ||
62 | + this.serviceInfoProvider = serviceInfoProvider; | ||
63 | + } | ||
64 | + | ||
65 | + @Override | ||
66 | + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() { | ||
67 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
68 | + } | ||
69 | + | ||
70 | + @Override | ||
71 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() { | ||
72 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
73 | + } | ||
74 | + | ||
75 | + @Override | ||
76 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() { | ||
77 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
78 | + } | ||
79 | + | ||
80 | + @Override | ||
81 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() { | ||
82 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
83 | + } | ||
84 | + | ||
85 | + @Override | ||
86 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() { | ||
87 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
88 | + } | ||
89 | + | ||
90 | + @Override | ||
91 | + public TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> getToCoreMsgConsumer() { | ||
92 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, coreSettings.getTopic(), | ||
93 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
94 | + } | ||
95 | + | ||
96 | + @Override | ||
97 | + public TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> getToCoreNotificationsMsgConsumer() { | ||
98 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
99 | + partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceInfoProvider.getServiceId()).getFullTopicName(), | ||
100 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
101 | + } | ||
102 | + | ||
103 | + @Override | ||
104 | + public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> getTransportApiRequestConsumer() { | ||
105 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic(), | ||
106 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiRequestMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
107 | + } | ||
108 | + | ||
109 | + @Override | ||
110 | + public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiResponseProducer() { | ||
111 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
112 | + } | ||
113 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.provider; | ||
17 | + | ||
18 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
19 | +import org.springframework.stereotype.Component; | ||
20 | +import org.thingsboard.server.common.msg.queue.ServiceType; | ||
21 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; | ||
22 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; | ||
23 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; | ||
24 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; | ||
25 | +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; | ||
26 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
27 | +import org.thingsboard.server.queue.TbQueueConsumer; | ||
28 | +import org.thingsboard.server.queue.TbQueueCoreSettings; | ||
29 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
30 | +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; | ||
31 | +import org.thingsboard.server.queue.common.TbProtoQueueMsg; | ||
32 | +import org.thingsboard.server.queue.discovery.PartitionService; | ||
33 | +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ||
34 | +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | ||
35 | +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
36 | +import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | ||
37 | + | ||
38 | +@Component | ||
39 | +@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && '${service.type:null}'=='tb-rule-engine'") | ||
40 | +public class PubSubTbRuleEngineQueueProvider implements TbRuleEngineQueueProvider { | ||
41 | + | ||
42 | + private final TbPubSubSettings pubSubSettings; | ||
43 | + private final TbQueueCoreSettings coreSettings; | ||
44 | + private final TbQueueRuleEngineSettings ruleEngineSettings; | ||
45 | + private final TbQueueAdmin admin; | ||
46 | + private final PartitionService partitionService; | ||
47 | + private final TbServiceInfoProvider serviceInfoProvider; | ||
48 | + | ||
49 | + public PubSubTbRuleEngineQueueProvider(TbPubSubSettings pubSubSettings, | ||
50 | + TbQueueCoreSettings coreSettings, | ||
51 | + TbQueueRuleEngineSettings ruleEngineSettings, | ||
52 | + TbQueueAdmin admin, | ||
53 | + PartitionService partitionService, | ||
54 | + TbServiceInfoProvider serviceInfoProvider) { | ||
55 | + this.pubSubSettings = pubSubSettings; | ||
56 | + this.coreSettings = coreSettings; | ||
57 | + this.ruleEngineSettings = ruleEngineSettings; | ||
58 | + this.admin = admin; | ||
59 | + this.partitionService = partitionService; | ||
60 | + this.serviceInfoProvider = serviceInfoProvider; | ||
61 | + } | ||
62 | + | ||
63 | + @Override | ||
64 | + public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsMsgProducer() { | ||
65 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
66 | + } | ||
67 | + | ||
68 | + @Override | ||
69 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() { | ||
70 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
71 | + } | ||
72 | + | ||
73 | + @Override | ||
74 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getRuleEngineNotificationsMsgProducer() { | ||
75 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); | ||
76 | + } | ||
77 | + | ||
78 | + @Override | ||
79 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() { | ||
80 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
81 | + | ||
82 | + } | ||
83 | + | ||
84 | + @Override | ||
85 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> getTbCoreNotificationsMsgProducer() { | ||
86 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
87 | + } | ||
88 | + | ||
89 | + @Override | ||
90 | + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> getToRuleEngineMsgConsumer() { | ||
91 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic(), | ||
92 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
93 | + } | ||
94 | + | ||
95 | + @Override | ||
96 | + public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> getToRuleEngineNotificationsMsgConsumer() { | ||
97 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
98 | + partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceInfoProvider.getServiceId()).getFullTopicName(), | ||
99 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
100 | + } | ||
101 | +} |
common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTransportQueueProvider.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.provider; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
20 | +import org.springframework.stereotype.Component; | ||
21 | +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; | ||
22 | +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; | ||
23 | +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; | ||
24 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; | ||
25 | +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; | ||
26 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
27 | +import org.thingsboard.server.queue.TbQueueConsumer; | ||
28 | +import org.thingsboard.server.queue.TbQueueCoreSettings; | ||
29 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
30 | +import org.thingsboard.server.queue.TbQueueRequestTemplate; | ||
31 | +import org.thingsboard.server.queue.TbQueueRuleEngineSettings; | ||
32 | +import org.thingsboard.server.queue.TbQueueTransportApiSettings; | ||
33 | +import org.thingsboard.server.queue.TbQueueTransportNotificationSettings; | ||
34 | +import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate; | ||
35 | +import org.thingsboard.server.queue.common.TbProtoQueueMsg; | ||
36 | +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; | ||
37 | +import org.thingsboard.server.queue.pubsub.TbPubSubAdmin; | ||
38 | +import org.thingsboard.server.queue.pubsub.TbPubSubConsumerTemplate; | ||
39 | +import org.thingsboard.server.queue.pubsub.TbPubSubProducerTemplate; | ||
40 | +import org.thingsboard.server.queue.pubsub.TbPubSubSettings; | ||
41 | + | ||
42 | +@Component | ||
43 | +@ConditionalOnExpression("'${queue.type:null}'=='pubsub' && ('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport')") | ||
44 | +@Slf4j | ||
45 | +public class PubSubTransportQueueProvider implements TbTransportQueueProvider { | ||
46 | + | ||
47 | + private final TbPubSubSettings pubSubSettings; | ||
48 | + private final TbServiceInfoProvider serviceInfoProvider; | ||
49 | + private final TbQueueCoreSettings coreSettings; | ||
50 | + private final TbQueueRuleEngineSettings ruleEngineSettings; | ||
51 | + private final TbQueueTransportApiSettings transportApiSettings; | ||
52 | + private final TbQueueTransportNotificationSettings transportNotificationSettings; | ||
53 | + private final TbQueueAdmin admin; | ||
54 | + | ||
55 | + public PubSubTransportQueueProvider(TbPubSubSettings pubSubSettings, | ||
56 | + TbServiceInfoProvider serviceInfoProvider, | ||
57 | + TbQueueCoreSettings coreSettings, | ||
58 | + TbQueueRuleEngineSettings ruleEngineSettings, | ||
59 | + TbQueueTransportApiSettings transportApiSettings, | ||
60 | + TbQueueTransportNotificationSettings transportNotificationSettings) { | ||
61 | + this.pubSubSettings = pubSubSettings; | ||
62 | + this.serviceInfoProvider = serviceInfoProvider; | ||
63 | + this.coreSettings = coreSettings; | ||
64 | + this.ruleEngineSettings = ruleEngineSettings; | ||
65 | + this.transportApiSettings = transportApiSettings; | ||
66 | + this.transportNotificationSettings = transportNotificationSettings; | ||
67 | + this.admin = new TbPubSubAdmin(pubSubSettings); | ||
68 | + } | ||
69 | + | ||
70 | + @Override | ||
71 | + public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiRequestTemplate() { | ||
72 | + TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producer = new TbPubSubProducerTemplate<>(admin, pubSubSettings, transportApiSettings.getRequestsTopic()); | ||
73 | + TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumer = new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
74 | + transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(), | ||
75 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
76 | + | ||
77 | + DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder | ||
78 | + <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder(); | ||
79 | + templateBuilder.queueAdmin(admin); | ||
80 | + templateBuilder.requestTemplate(producer); | ||
81 | + templateBuilder.responseTemplate(consumer); | ||
82 | + templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests()); | ||
83 | + templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout()); | ||
84 | + templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval()); | ||
85 | + return templateBuilder.build(); | ||
86 | + } | ||
87 | + | ||
88 | + @Override | ||
89 | + public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() { | ||
90 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, ruleEngineSettings.getTopic()); | ||
91 | + } | ||
92 | + | ||
93 | + @Override | ||
94 | + public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() { | ||
95 | + return new TbPubSubProducerTemplate<>(admin, pubSubSettings, coreSettings.getTopic()); | ||
96 | + } | ||
97 | + | ||
98 | + @Override | ||
99 | + public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsConsumer() { | ||
100 | + return new TbPubSubConsumerTemplate<>(admin, pubSubSettings, | ||
101 | + transportNotificationSettings.getNotificationsTopic() + "." + serviceInfoProvider.getServiceId(), | ||
102 | + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToTransportMsg.parseFrom(msg.getData()), msg.getHeaders())); | ||
103 | + } | ||
104 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.pubsub; | ||
17 | + | ||
18 | +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; | ||
19 | +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; | ||
20 | +import com.google.cloud.pubsub.v1.TopicAdminClient; | ||
21 | +import com.google.cloud.pubsub.v1.TopicAdminSettings; | ||
22 | +import com.google.pubsub.v1.ListSubscriptionsRequest; | ||
23 | +import com.google.pubsub.v1.ListTopicsRequest; | ||
24 | +import com.google.pubsub.v1.ProjectName; | ||
25 | +import com.google.pubsub.v1.ProjectSubscriptionName; | ||
26 | +import com.google.pubsub.v1.ProjectTopicName; | ||
27 | +import com.google.pubsub.v1.PushConfig; | ||
28 | +import com.google.pubsub.v1.Subscription; | ||
29 | +import com.google.pubsub.v1.Topic; | ||
30 | +import lombok.extern.slf4j.Slf4j; | ||
31 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
32 | + | ||
33 | +import java.io.IOException; | ||
34 | +import java.util.Set; | ||
35 | +import java.util.concurrent.ConcurrentHashMap; | ||
36 | + | ||
37 | +@Slf4j | ||
38 | +public class TbPubSubAdmin implements TbQueueAdmin { | ||
39 | + | ||
40 | + private final TbPubSubSettings pubSubSettings; | ||
41 | + private final SubscriptionAdminSettings subscriptionAdminSettings; | ||
42 | + private final TopicAdminSettings topicAdminSettings; | ||
43 | + private final Set<String> topicSet = ConcurrentHashMap.newKeySet(); | ||
44 | + private final Set<String> subscriptionSet = ConcurrentHashMap.newKeySet(); | ||
45 | + | ||
46 | + public TbPubSubAdmin(TbPubSubSettings pubSubSettings) { | ||
47 | + this.pubSubSettings = pubSubSettings; | ||
48 | + | ||
49 | + try { | ||
50 | + topicAdminSettings = TopicAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); | ||
51 | + } catch (IOException e) { | ||
52 | + log.error("Failed to create TopicAdminSettings"); | ||
53 | + throw new RuntimeException("Failed to create TopicAdminSettings."); | ||
54 | + } | ||
55 | + | ||
56 | + try { | ||
57 | + subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder().setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); | ||
58 | + } catch (IOException e) { | ||
59 | + log.error("Failed to create SubscriptionAdminSettings"); | ||
60 | + throw new RuntimeException("Failed to create SubscriptionAdminSettings."); | ||
61 | + } | ||
62 | + | ||
63 | + try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { | ||
64 | + ListTopicsRequest listTopicsRequest = | ||
65 | + ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); | ||
66 | + TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); | ||
67 | + for (Topic topic : response.iterateAll()) { | ||
68 | + topicSet.add(topic.getName()); | ||
69 | + } | ||
70 | + } catch (IOException e) { | ||
71 | + log.error("Failed to get topics.", e); | ||
72 | + throw new RuntimeException("Failed to get topics.", e); | ||
73 | + } | ||
74 | + | ||
75 | + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { | ||
76 | + | ||
77 | + ListSubscriptionsRequest listSubscriptionsRequest = | ||
78 | + ListSubscriptionsRequest.newBuilder() | ||
79 | + .setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()) | ||
80 | + .build(); | ||
81 | + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = | ||
82 | + subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest); | ||
83 | + | ||
84 | + for (Subscription subscription : response.iterateAll()) { | ||
85 | + subscriptionSet.add(subscription.getName()); | ||
86 | + } | ||
87 | + } catch (IOException e) { | ||
88 | + log.error("Failed to get subscriptions.", e); | ||
89 | + throw new RuntimeException("Failed to get subscriptions.", e); | ||
90 | + } | ||
91 | + } | ||
92 | + | ||
93 | + @Override | ||
94 | + public void createTopicIfNotExists(String partition) { | ||
95 | + ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), partition); | ||
96 | + | ||
97 | + if (topicSet.contains(topicName.toString())) { | ||
98 | + createSubscriptionIfNotExists(partition, topicName); | ||
99 | + return; | ||
100 | + } | ||
101 | + | ||
102 | + try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) { | ||
103 | + ListTopicsRequest listTopicsRequest = | ||
104 | + ListTopicsRequest.newBuilder().setProject(ProjectName.format(pubSubSettings.getProjectId())).build(); | ||
105 | + TopicAdminClient.ListTopicsPagedResponse response = topicAdminClient.listTopics(listTopicsRequest); | ||
106 | + for (Topic topic : response.iterateAll()) { | ||
107 | + if (topic.getName().contains(topicName.toString())) { | ||
108 | + topicSet.add(topic.getName()); | ||
109 | + createSubscriptionIfNotExists(partition, topicName); | ||
110 | + return; | ||
111 | + } | ||
112 | + } | ||
113 | + | ||
114 | + topicAdminClient.createTopic(topicName); | ||
115 | + topicSet.add(topicName.toString()); | ||
116 | + log.info("Created new topic: [{}]", topicName.toString()); | ||
117 | + createSubscriptionIfNotExists(partition, topicName); | ||
118 | + } catch (IOException e) { | ||
119 | + log.error("Failed to create topic: [{}].", topicName.toString(), e); | ||
120 | + throw new RuntimeException("Failed to create topic.", e); | ||
121 | + } | ||
122 | + } | ||
123 | + | ||
124 | + private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { | ||
125 | + ProjectSubscriptionName subscriptionName = | ||
126 | + ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); | ||
127 | + | ||
128 | + if (subscriptionSet.contains(subscriptionName.toString())) { | ||
129 | + return; | ||
130 | + } | ||
131 | + | ||
132 | + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { | ||
133 | + ListSubscriptionsRequest listSubscriptionsRequest = | ||
134 | + ListSubscriptionsRequest.newBuilder() | ||
135 | + .setProject(ProjectName.of(pubSubSettings.getProjectId()).toString()) | ||
136 | + .build(); | ||
137 | + SubscriptionAdminClient.ListSubscriptionsPagedResponse response = | ||
138 | + subscriptionAdminClient.listSubscriptions(listSubscriptionsRequest); | ||
139 | + | ||
140 | + for (Subscription subscription : response.iterateAll()) { | ||
141 | + if (subscription.getName().equals(subscriptionName.toString())) { | ||
142 | + subscriptionSet.add(subscription.getName()); | ||
143 | + return; | ||
144 | + } | ||
145 | + } | ||
146 | + | ||
147 | + subscriptionAdminClient.createSubscription( | ||
148 | + subscriptionName, topicName, PushConfig.getDefaultInstance(), pubSubSettings.getAckDeadline()).getName(); | ||
149 | + subscriptionSet.add(subscriptionName.toString()); | ||
150 | + log.info("Created new subscription: [{}]", subscriptionName.toString()); | ||
151 | + } catch (IOException e) { | ||
152 | + log.error("Failed to create subscription: [{}].", subscriptionName.toString(), e); | ||
153 | + throw new RuntimeException("Failed to create subscription.", e); | ||
154 | + } | ||
155 | + } | ||
156 | + | ||
157 | +} |
common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.pubsub; | ||
17 | + | ||
18 | +import com.google.api.core.ApiFuture; | ||
19 | +import com.google.api.core.ApiFutures; | ||
20 | +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; | ||
21 | +import com.google.cloud.pubsub.v1.stub.SubscriberStub; | ||
22 | +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; | ||
23 | +import com.google.common.reflect.TypeToken; | ||
24 | +import com.google.gson.Gson; | ||
25 | +import com.google.protobuf.InvalidProtocolBufferException; | ||
26 | +import com.google.pubsub.v1.AcknowledgeRequest; | ||
27 | +import com.google.pubsub.v1.ProjectSubscriptionName; | ||
28 | +import com.google.pubsub.v1.PubsubMessage; | ||
29 | +import com.google.pubsub.v1.PullRequest; | ||
30 | +import com.google.pubsub.v1.PullResponse; | ||
31 | +import com.google.pubsub.v1.ReceivedMessage; | ||
32 | +import lombok.extern.slf4j.Slf4j; | ||
33 | +import org.springframework.util.CollectionUtils; | ||
34 | +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; | ||
35 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
36 | +import org.thingsboard.server.queue.TbQueueConsumer; | ||
37 | +import org.thingsboard.server.queue.TbQueueMsg; | ||
38 | +import org.thingsboard.server.queue.TbQueueMsgDecoder; | ||
39 | +import org.thingsboard.server.queue.TbQueueMsgHeaders; | ||
40 | +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | ||
41 | +import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; | ||
42 | + | ||
43 | +import java.io.IOException; | ||
44 | +import java.util.ArrayList; | ||
45 | +import java.util.Collections; | ||
46 | +import java.util.List; | ||
47 | +import java.util.Map; | ||
48 | +import java.util.Objects; | ||
49 | +import java.util.Set; | ||
50 | +import java.util.concurrent.CopyOnWriteArrayList; | ||
51 | +import java.util.concurrent.ExecutionException; | ||
52 | +import java.util.concurrent.ExecutorService; | ||
53 | +import java.util.concurrent.Executors; | ||
54 | +import java.util.stream.Collectors; | ||
55 | + | ||
56 | +@Slf4j | ||
57 | +public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { | ||
58 | + | ||
59 | + private final Gson gson = new Gson(); | ||
60 | + private final TbQueueAdmin admin; | ||
61 | + private final String topic; | ||
62 | + private final TbQueueMsgDecoder<T> decoder; | ||
63 | + private final TbPubSubSettings pubSubSettings; | ||
64 | + | ||
65 | + private volatile boolean subscribed; | ||
66 | + private volatile Set<TopicPartitionInfo> partitions; | ||
67 | + private volatile Set<String> subscriptionNames; | ||
68 | + private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>(); | ||
69 | + | ||
70 | + private ExecutorService consumerExecutor; | ||
71 | + private final SubscriberStub subscriber; | ||
72 | + private volatile boolean stopped; | ||
73 | + | ||
74 | + private volatile int messagesPerTopic; | ||
75 | + | ||
76 | + public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) { | ||
77 | + this.admin = admin; | ||
78 | + this.pubSubSettings = pubSubSettings; | ||
79 | + this.topic = topic; | ||
80 | + this.decoder = decoder; | ||
81 | + | ||
82 | + try { | ||
83 | + SubscriberStubSettings subscriberStubSettings = | ||
84 | + SubscriberStubSettings.newBuilder() | ||
85 | + .setCredentialsProvider(pubSubSettings.getCredentialsProvider()) | ||
86 | + .setTransportChannelProvider( | ||
87 | + SubscriberStubSettings.defaultGrpcTransportProviderBuilder() | ||
88 | + .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) | ||
89 | + .build()) | ||
90 | + .build(); | ||
91 | + | ||
92 | + this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); | ||
93 | + } catch (IOException e) { | ||
94 | + log.error("Failed to create subscriber.", e); | ||
95 | + throw new RuntimeException("Failed to create subscriber.", e); | ||
96 | + } | ||
97 | + stopped = false; | ||
98 | + } | ||
99 | + | ||
100 | + @Override | ||
101 | + public String getTopic() { | ||
102 | + return topic; | ||
103 | + } | ||
104 | + | ||
105 | + @Override | ||
106 | + public void subscribe() { | ||
107 | + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); | ||
108 | + subscribed = false; | ||
109 | + } | ||
110 | + | ||
111 | + @Override | ||
112 | + public void subscribe(Set<TopicPartitionInfo> partitions) { | ||
113 | + this.partitions = partitions; | ||
114 | + subscribed = false; | ||
115 | + } | ||
116 | + | ||
117 | + @Override | ||
118 | + public void unsubscribe() { | ||
119 | + stopped = true; | ||
120 | + if (consumerExecutor != null) { | ||
121 | + consumerExecutor.shutdownNow(); | ||
122 | + } | ||
123 | + | ||
124 | + if (subscriber != null) { | ||
125 | + subscriber.close(); | ||
126 | + } | ||
127 | + } | ||
128 | + | ||
129 | + @Override | ||
130 | + public List<T> poll(long durationInMillis) { | ||
131 | + if (!subscribed && partitions == null) { | ||
132 | + try { | ||
133 | + Thread.sleep(durationInMillis); | ||
134 | + } catch (InterruptedException e) { | ||
135 | + log.debug("Failed to await subscription", e); | ||
136 | + } | ||
137 | + } else { | ||
138 | + if (!subscribed) { | ||
139 | + subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); | ||
140 | + subscriptionNames.forEach(admin::createTopicIfNotExists); | ||
141 | + consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); | ||
142 | + messagesPerTopic = pubSubSettings.getMaxMessages()/subscriptionNames.size(); | ||
143 | + subscribed = true; | ||
144 | + } | ||
145 | + List<ReceivedMessage> messages; | ||
146 | + try { | ||
147 | + messages = receiveMessages(); | ||
148 | + if (!messages.isEmpty()) { | ||
149 | + List<T> result = new ArrayList<>(); | ||
150 | + messages.forEach(msg -> { | ||
151 | + try { | ||
152 | + result.add(decode(msg.getMessage())); | ||
153 | + } catch (InvalidProtocolBufferException e) { | ||
154 | + log.error("Failed decode record: [{}]", msg); | ||
155 | + } | ||
156 | + }); | ||
157 | + return result; | ||
158 | + } | ||
159 | + } catch (ExecutionException | InterruptedException e) { | ||
160 | + if (stopped) { | ||
161 | + log.info("[{}] Pub/Sub consumer is stopped.", topic); | ||
162 | + } else { | ||
163 | + log.error("Failed to receive messages", e); | ||
164 | + } | ||
165 | + } | ||
166 | + } | ||
167 | + return Collections.emptyList(); | ||
168 | + } | ||
169 | + | ||
170 | + @Override | ||
171 | + public void commit() { | ||
172 | + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall); | ||
173 | + acknowledgeRequests.clear(); | ||
174 | + } | ||
175 | + | ||
176 | + private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException { | ||
177 | + List<ApiFuture<List<ReceivedMessage>>> result = subscriptionNames.stream().map(subscriptionId -> { | ||
178 | + String subscriptionName = ProjectSubscriptionName.format(pubSubSettings.getProjectId(), subscriptionId); | ||
179 | + PullRequest pullRequest = | ||
180 | + PullRequest.newBuilder() | ||
181 | + .setMaxMessages(messagesPerTopic) | ||
182 | + .setReturnImmediately(false) // return immediately if messages are not available | ||
183 | + .setSubscription(subscriptionName) | ||
184 | + .build(); | ||
185 | + | ||
186 | + ApiFuture<PullResponse> pullResponseApiFuture = subscriber.pullCallable().futureCall(pullRequest); | ||
187 | + | ||
188 | + return ApiFutures.transform(pullResponseApiFuture, pullResponse -> { | ||
189 | + if (pullResponse != null && !pullResponse.getReceivedMessagesList().isEmpty()) { | ||
190 | + List<String> ackIds = new ArrayList<>(); | ||
191 | + for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { | ||
192 | + ackIds.add(message.getAckId()); | ||
193 | + } | ||
194 | + AcknowledgeRequest acknowledgeRequest = | ||
195 | + AcknowledgeRequest.newBuilder() | ||
196 | + .setSubscription(subscriptionName) | ||
197 | + .addAllAckIds(ackIds) | ||
198 | + .build(); | ||
199 | + | ||
200 | + acknowledgeRequests.add(acknowledgeRequest); | ||
201 | + return pullResponse.getReceivedMessagesList(); | ||
202 | + } | ||
203 | + return null; | ||
204 | + }, consumerExecutor); | ||
205 | + | ||
206 | + }).collect(Collectors.toList()); | ||
207 | + | ||
208 | + ApiFuture<List<ReceivedMessage>> transform = ApiFutures.transform(ApiFutures.allAsList(result), listMessages -> { | ||
209 | + if (!CollectionUtils.isEmpty(listMessages)) { | ||
210 | + return listMessages.stream().filter(Objects::nonNull).flatMap(List::stream).collect(Collectors.toList()); | ||
211 | + } | ||
212 | + return Collections.emptyList(); | ||
213 | + }, consumerExecutor); | ||
214 | + | ||
215 | + return transform.get(); | ||
216 | + } | ||
217 | + | ||
218 | + public T decode(PubsubMessage message) throws InvalidProtocolBufferException { | ||
219 | + DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); | ||
220 | + TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); | ||
221 | + Map<String, byte[]> headerMap = gson.fromJson(message.getAttributesMap().get("headers"), new TypeToken<Map<String, byte[]>>() { | ||
222 | + }.getType()); | ||
223 | + headerMap.forEach(headers::put); | ||
224 | + msg.setHeaders(headers); | ||
225 | + return decoder.decode(msg); | ||
226 | + } | ||
227 | + | ||
228 | +} |
common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java
0 → 100644
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.pubsub; | ||
17 | + | ||
18 | +import com.google.api.core.ApiFuture; | ||
19 | +import com.google.api.core.ApiFutureCallback; | ||
20 | +import com.google.api.core.ApiFutures; | ||
21 | +import com.google.cloud.pubsub.v1.Publisher; | ||
22 | +import com.google.gson.Gson; | ||
23 | +import com.google.protobuf.ByteString; | ||
24 | +import com.google.pubsub.v1.ProjectTopicName; | ||
25 | +import com.google.pubsub.v1.PubsubMessage; | ||
26 | +import lombok.extern.slf4j.Slf4j; | ||
27 | +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; | ||
28 | +import org.thingsboard.server.queue.TbQueueAdmin; | ||
29 | +import org.thingsboard.server.queue.TbQueueCallback; | ||
30 | +import org.thingsboard.server.queue.TbQueueMsg; | ||
31 | +import org.thingsboard.server.queue.TbQueueProducer; | ||
32 | +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | ||
33 | + | ||
34 | +import java.io.IOException; | ||
35 | +import java.util.Map; | ||
36 | +import java.util.concurrent.ConcurrentHashMap; | ||
37 | +import java.util.concurrent.ExecutorService; | ||
38 | +import java.util.concurrent.Executors; | ||
39 | +import java.util.concurrent.TimeUnit; | ||
40 | + | ||
41 | +@Slf4j | ||
42 | +public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueueProducer<T> { | ||
43 | + | ||
44 | + private final Gson gson = new Gson(); | ||
45 | + | ||
46 | + private final String defaultTopic; | ||
47 | + private final TbQueueAdmin admin; | ||
48 | + private final TbPubSubSettings pubSubSettings; | ||
49 | + | ||
50 | + private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>(); | ||
51 | + | ||
52 | + private ExecutorService pubExecutor = Executors.newCachedThreadPool(); | ||
53 | + | ||
54 | + public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) { | ||
55 | + this.defaultTopic = defaultTopic; | ||
56 | + this.admin = admin; | ||
57 | + this.pubSubSettings = pubSubSettings; | ||
58 | + } | ||
59 | + | ||
60 | + @Override | ||
61 | + public void init() { | ||
62 | + | ||
63 | + } | ||
64 | + | ||
65 | + @Override | ||
66 | + public String getDefaultTopic() { | ||
67 | + return defaultTopic; | ||
68 | + } | ||
69 | + | ||
70 | + @Override | ||
71 | + public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { | ||
72 | + PubsubMessage.Builder pubsubMessageBuilder = PubsubMessage.newBuilder(); | ||
73 | + pubsubMessageBuilder.setData(getMsg(msg)); | ||
74 | + pubsubMessageBuilder.putAttributes("headers", gson.toJson(msg.getHeaders().getData())); | ||
75 | + | ||
76 | + Publisher publisher = getOrCreatePublisher(tpi.getFullTopicName()); | ||
77 | + ApiFuture<String> future = publisher.publish(pubsubMessageBuilder.build()); | ||
78 | + | ||
79 | + ApiFutures.addCallback(future, new ApiFutureCallback<String>() { | ||
80 | + public void onSuccess(String messageId) { | ||
81 | + if (callback != null) { | ||
82 | + callback.onSuccess(null); | ||
83 | + } | ||
84 | + } | ||
85 | + | ||
86 | + public void onFailure(Throwable t) { | ||
87 | + if (callback != null) { | ||
88 | + callback.onFailure(t); | ||
89 | + } | ||
90 | + } | ||
91 | + }, pubExecutor); | ||
92 | + } | ||
93 | + | ||
94 | + @Override | ||
95 | + public void stop() { | ||
96 | + publisherMap.forEach((k, v) -> { | ||
97 | + if (v != null) { | ||
98 | + try { | ||
99 | + v.shutdown(); | ||
100 | + v.awaitTermination(1, TimeUnit.SECONDS); | ||
101 | + } catch (Exception e) { | ||
102 | + log.error("Failed to shutdown PubSub client during destroy()", e); | ||
103 | + } | ||
104 | + } | ||
105 | + }); | ||
106 | + | ||
107 | + if (pubExecutor != null) { | ||
108 | + pubExecutor.shutdownNow(); | ||
109 | + } | ||
110 | + } | ||
111 | + | ||
112 | + private ByteString getMsg(T msg) { | ||
113 | + String json = gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData())); | ||
114 | + return ByteString.copyFrom(json.getBytes()); | ||
115 | + } | ||
116 | + | ||
117 | + private Publisher getOrCreatePublisher(String topic) { | ||
118 | + if (publisherMap.containsKey(topic)) { | ||
119 | + return publisherMap.get(topic); | ||
120 | + } else { | ||
121 | + try { | ||
122 | + admin.createTopicIfNotExists(topic); | ||
123 | + ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic); | ||
124 | + Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); | ||
125 | + publisherMap.put(topic, publisher); | ||
126 | + return publisher; | ||
127 | + } catch (IOException e) { | ||
128 | + log.error("Failed to create topic [{}].", topic, e); | ||
129 | + throw new RuntimeException("Failed to create topic.", e); | ||
130 | + } | ||
131 | + } | ||
132 | + | ||
133 | + } | ||
134 | + | ||
135 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2020 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.queue.pubsub; | ||
17 | + | ||
18 | +import com.google.api.gax.core.CredentialsProvider; | ||
19 | +import com.google.api.gax.core.FixedCredentialsProvider; | ||
20 | +import com.google.auth.oauth2.ServiceAccountCredentials; | ||
21 | +import lombok.Data; | ||
22 | +import lombok.extern.slf4j.Slf4j; | ||
23 | +import org.springframework.beans.factory.annotation.Value; | ||
24 | +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; | ||
25 | +import org.springframework.stereotype.Component; | ||
26 | + | ||
27 | +import javax.annotation.PostConstruct; | ||
28 | +import java.io.ByteArrayInputStream; | ||
29 | +import java.io.IOException; | ||
30 | + | ||
31 | +@Slf4j | ||
32 | +@ConditionalOnExpression("'${queue.type:null}'=='pubsub'") | ||
33 | +@Component | ||
34 | +@Data | ||
35 | +public class TbPubSubSettings { | ||
36 | + | ||
37 | + @Value("${queue.pubsub.project_id}") | ||
38 | + private String projectId; | ||
39 | + | ||
40 | + @Value("${queue.pubsub.service_account}") | ||
41 | + private String serviceAccount; | ||
42 | + | ||
43 | + @Value("${queue.pubsub.ack_deadline}") | ||
44 | + private int ackDeadline; | ||
45 | + | ||
46 | + @Value("${queue.pubsub.max_msg_size}") | ||
47 | + private int maxMsgSize; | ||
48 | + | ||
49 | + @Value("${queue.pubsub.max_messages}") | ||
50 | + private int maxMessages; | ||
51 | + | ||
52 | + private CredentialsProvider credentialsProvider; | ||
53 | + | ||
54 | + @PostConstruct | ||
55 | + private void init() throws IOException { | ||
56 | + ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream( | ||
57 | + new ByteArrayInputStream(serviceAccount.getBytes())); | ||
58 | + credentialsProvider = FixedCredentialsProvider.create(credentials); | ||
59 | + } | ||
60 | + | ||
61 | +} |
@@ -39,6 +39,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; | @@ -39,6 +39,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; | ||
39 | import org.thingsboard.server.queue.TbQueueMsg; | 39 | import org.thingsboard.server.queue.TbQueueMsg; |
40 | import org.thingsboard.server.queue.TbQueueMsgDecoder; | 40 | import org.thingsboard.server.queue.TbQueueMsgDecoder; |
41 | import org.thingsboard.server.queue.TbQueueMsgHeaders; | 41 | import org.thingsboard.server.queue.TbQueueMsgHeaders; |
42 | +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | ||
42 | import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; | 43 | import org.thingsboard.server.queue.common.DefaultTbQueueMsgHeaders; |
43 | 44 | ||
44 | import java.io.IOException; | 45 | import java.io.IOException; |
@@ -212,7 +213,7 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | @@ -212,7 +213,7 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | ||
212 | } | 213 | } |
213 | 214 | ||
214 | public T decode(Message message) throws InvalidProtocolBufferException { | 215 | public T decode(Message message) throws InvalidProtocolBufferException { |
215 | - TbAwsSqsMsg msg = gson.fromJson(message.getBody(), TbAwsSqsMsg.class); | 216 | + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class); |
216 | TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); | 217 | TbQueueMsgHeaders headers = new DefaultTbQueueMsgHeaders(); |
217 | Map<String, byte[]> headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken<Map<String, byte[]>>() { | 218 | Map<String, byte[]> headerMap = gson.fromJson(message.getMessageAttributes().get("headers").getStringValue(), new TypeToken<Map<String, byte[]>>() { |
218 | }.getType()); | 219 | }.getType()); |
@@ -35,6 +35,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | @@ -35,6 +35,7 @@ import org.thingsboard.server.queue.TbQueueAdmin; | ||
35 | import org.thingsboard.server.queue.TbQueueCallback; | 35 | import org.thingsboard.server.queue.TbQueueCallback; |
36 | import org.thingsboard.server.queue.TbQueueMsg; | 36 | import org.thingsboard.server.queue.TbQueueMsg; |
37 | import org.thingsboard.server.queue.TbQueueProducer; | 37 | import org.thingsboard.server.queue.TbQueueProducer; |
38 | +import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | ||
38 | 39 | ||
39 | import java.util.HashMap; | 40 | import java.util.HashMap; |
40 | import java.util.Map; | 41 | import java.util.Map; |
@@ -79,7 +80,7 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | @@ -79,7 +80,7 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | ||
79 | public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { | 80 | public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { |
80 | SendMessageRequest sendMsgRequest = new SendMessageRequest(); | 81 | SendMessageRequest sendMsgRequest = new SendMessageRequest(); |
81 | sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); | 82 | sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); |
82 | - sendMsgRequest.withMessageBody(gson.toJson(new TbAwsSqsMsg(msg.getKey(), msg.getData()))); | 83 | + sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg.getKey(), msg.getData()))); |
83 | 84 | ||
84 | Map<String, MessageAttributeValue> attributes = new HashMap<>(); | 85 | Map<String, MessageAttributeValue> attributes = new HashMap<>(); |
85 | 86 |
@@ -63,7 +63,7 @@ | @@ -63,7 +63,7 @@ | ||
63 | <mail.version>1.4.3</mail.version> | 63 | <mail.version>1.4.3</mail.version> |
64 | <curator.version>4.2.0</curator.version> | 64 | <curator.version>4.2.0</curator.version> |
65 | <zookeeper.version>3.5.5</zookeeper.version> | 65 | <zookeeper.version>3.5.5</zookeeper.version> |
66 | - <protobuf.version>3.6.1</protobuf.version> | 66 | + <protobuf.version>3.9.1</protobuf.version> |
67 | <grpc.version>1.22.1</grpc.version> | 67 | <grpc.version>1.22.1</grpc.version> |
68 | <lombok.version>1.16.18</lombok.version> | 68 | <lombok.version>1.16.18</lombok.version> |
69 | <paho.client.version>1.1.0</paho.client.version> | 69 | <paho.client.version>1.1.0</paho.client.version> |
@@ -93,6 +93,7 @@ | @@ -93,6 +93,7 @@ | ||
93 | <antlr.version>2.7.7</antlr.version> | 93 | <antlr.version>2.7.7</antlr.version> |
94 | <snakeyaml.version>1.23</snakeyaml.version> | 94 | <snakeyaml.version>1.23</snakeyaml.version> |
95 | <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version> | 95 | <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version> |
96 | + <pubsub.client.version>1.84.0</pubsub.client.version> | ||
96 | <passay.version>1.5.0</passay.version> | 97 | <passay.version>1.5.0</passay.version> |
97 | <ua-parser.version>1.4.3</ua-parser.version> | 98 | <ua-parser.version>1.4.3</ua-parser.version> |
98 | </properties> | 99 | </properties> |
@@ -888,10 +889,15 @@ | @@ -888,10 +889,15 @@ | ||
888 | <version>${jts.version}</version> | 889 | <version>${jts.version}</version> |
889 | </dependency> | 890 | </dependency> |
890 | <dependency> | 891 | <dependency> |
891 | - <groupId>com.amazonaws</groupId> | ||
892 | - <artifactId>aws-java-sdk-sqs</artifactId> | ||
893 | - <version>${amazonaws.sqs.version}</version> | ||
894 | - </dependency> | 892 | + <groupId>com.amazonaws</groupId> |
893 | + <artifactId>aws-java-sdk-sqs</artifactId> | ||
894 | + <version>${amazonaws.sqs.version}</version> | ||
895 | + </dependency> | ||
896 | + <dependency> | ||
897 | + <groupId>com.google.cloud</groupId> | ||
898 | + <artifactId>google-cloud-pubsub</artifactId> | ||
899 | + <version>${pubsub.client.version}</version> | ||
900 | + </dependency> | ||
895 | <dependency> | 901 | <dependency> |
896 | <groupId>org.passay</groupId> | 902 | <groupId>org.passay</groupId> |
897 | <artifactId>passay</artifactId> | 903 | <artifactId>passay</artifactId> |
@@ -36,7 +36,6 @@ | @@ -36,7 +36,6 @@ | ||
36 | <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | 36 | <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
37 | <main.dir>${basedir}/../..</main.dir> | 37 | <main.dir>${basedir}/../..</main.dir> |
38 | <aws.sdk.version>1.11.747</aws.sdk.version> | 38 | <aws.sdk.version>1.11.747</aws.sdk.version> |
39 | - <pubsub.client.version>1.83.0</pubsub.client.version> | ||
40 | <google.common.protos.version>1.16.0</google.common.protos.version> | 39 | <google.common.protos.version>1.16.0</google.common.protos.version> |
41 | </properties> | 40 | </properties> |
42 | 41 | ||
@@ -99,7 +98,6 @@ | @@ -99,7 +98,6 @@ | ||
99 | <dependency> | 98 | <dependency> |
100 | <groupId>com.google.cloud</groupId> | 99 | <groupId>com.google.cloud</groupId> |
101 | <artifactId>google-cloud-pubsub</artifactId> | 100 | <artifactId>google-cloud-pubsub</artifactId> |
102 | - <version>${pubsub.client.version}</version> | ||
103 | </dependency> | 101 | </dependency> |
104 | <dependency> | 102 | <dependency> |
105 | <groupId>com.google.api.grpc</groupId> | 103 | <groupId>com.google.api.grpc</groupId> |
@@ -63,7 +63,7 @@ transport: | @@ -63,7 +63,7 @@ transport: | ||
63 | max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" | 63 | max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" |
64 | 64 | ||
65 | queue: | 65 | queue: |
66 | - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs | 66 | + type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub |
67 | kafka: | 67 | kafka: |
68 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" | 68 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
69 | acks: "${TB_KAFKA_ACKS:all}" | 69 | acks: "${TB_KAFKA_ACKS:all}" |
@@ -75,6 +75,14 @@ queue: | @@ -75,6 +75,14 @@ queue: | ||
75 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" | 75 | access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" |
76 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" | 76 | secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" |
77 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" | 77 | region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" |
78 | + threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}" | ||
79 | + visibility_timeout: "${TB_QUEUE_AWS_SQS_VISIBILITY_TIMEOUT:30}" #In seconds. If messages wont commit in this time, messages will poll again | ||
80 | + pubsub: | ||
81 | + project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}" | ||
82 | + service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}" | ||
83 | + ack_deadline: "${TB_QUEUE_PUBSUB_ACK_DEADLINE:30}" #In seconds. If messages wont commit in this time, messages will poll again | ||
84 | + max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes | ||
85 | + max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" | ||
78 | partitions: | 86 | partitions: |
79 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" | 87 | hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" |
80 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" | 88 | virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" |