Commit 06c3caf082ee48eed660927ff453d530e965bfb6

Authored by YevhenBondarenko
1 parent eedb3838

refactored

@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.queue.pubsub; 16 package org.thingsboard.server.queue.pubsub;
17 17
  18 +import com.google.api.gax.rpc.AlreadyExistsException;
18 import com.google.cloud.pubsub.v1.SubscriptionAdminClient; 19 import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
19 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; 20 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
20 import com.google.cloud.pubsub.v1.TopicAdminClient; 21 import com.google.cloud.pubsub.v1.TopicAdminClient;
@@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest; @@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest;
24 import com.google.pubsub.v1.ListTopicsRequest; 25 import com.google.pubsub.v1.ListTopicsRequest;
25 import com.google.pubsub.v1.ProjectName; 26 import com.google.pubsub.v1.ProjectName;
26 import com.google.pubsub.v1.ProjectSubscriptionName; 27 import com.google.pubsub.v1.ProjectSubscriptionName;
27 -import com.google.pubsub.v1.ProjectTopicName;  
28 import com.google.pubsub.v1.Subscription; 28 import com.google.pubsub.v1.Subscription;
29 import com.google.pubsub.v1.Topic; 29 import com.google.pubsub.v1.Topic;
  30 +import com.google.pubsub.v1.TopicName;
30 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
31 import org.thingsboard.server.queue.TbQueueAdmin; 32 import org.thingsboard.server.queue.TbQueueAdmin;
32 33
@@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin {
103 104
104 @Override 105 @Override
105 public void createTopicIfNotExists(String partition) { 106 public void createTopicIfNotExists(String partition) {
106 - ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), partition); 107 + TopicName topicName = TopicName.newBuilder()
  108 + .setTopic(partition)
  109 + .setProject(pubSubSettings.getProjectId())
  110 + .build();
107 111
108 if (topicSet.contains(topicName.toString())) { 112 if (topicSet.contains(topicName.toString())) {
109 createSubscriptionIfNotExists(partition, topicName); 113 createSubscriptionIfNotExists(partition, topicName);
@@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin {
121 } 125 }
122 } 126 }
123 127
124 - topicAdminClient.createTopic(topicName);  
125 - topicSet.add(topicName.toString());  
126 - log.info("Created new topic: [{}]", topicName.toString()); 128 + try {
  129 + topicAdminClient.createTopic(topicName);
  130 + log.info("Created new topic: [{}]", topicName.toString());
  131 + } catch (AlreadyExistsException e) {
  132 + log.info("[{}] Topic already exist.", topicName.toString());
  133 + } finally {
  134 + topicSet.add(topicName.toString());
  135 + }
127 createSubscriptionIfNotExists(partition, topicName); 136 createSubscriptionIfNotExists(partition, topicName);
128 } 137 }
129 138
130 - private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { 139 + private void createSubscriptionIfNotExists(String partition, TopicName topicName) {
131 ProjectSubscriptionName subscriptionName = 140 ProjectSubscriptionName subscriptionName =
132 ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); 141 ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition);
133 142
@@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin {
153 setAckDeadline(subscriptionBuilder); 162 setAckDeadline(subscriptionBuilder);
154 setMessageRetention(subscriptionBuilder); 163 setMessageRetention(subscriptionBuilder);
155 164
156 - subscriptionAdminClient.createSubscription(subscriptionBuilder.build());  
157 - subscriptionSet.add(subscriptionName.toString());  
158 - log.info("Created new subscription: [{}]", subscriptionName.toString()); 165 + try {
  166 + subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
  167 + log.info("Created new subscription: [{}]", subscriptionName.toString());
  168 + } catch (AlreadyExistsException e) {
  169 + log.info("[{}] Subscription already exist.", subscriptionName.toString());
  170 + } finally {
  171 + subscriptionSet.add(subscriptionName.toString());
  172 + }
159 } 173 }
160 174
161 private void setAckDeadline(Subscription.Builder builder) { 175 private void setAckDeadline(Subscription.Builder builder) {
@@ -134,6 +134,11 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -134,6 +134,11 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
134 if (!subscribed) { 134 if (!subscribed) {
135 subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet()); 135 subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());
136 subscriptionNames.forEach(admin::createTopicIfNotExists); 136 subscriptionNames.forEach(admin::createTopicIfNotExists);
  137 +
  138 + if (consumerExecutor != null) {
  139 + consumerExecutor.shutdown();
  140 + }
  141 +
137 consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size()); 142 consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());
138 messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size(); 143 messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
139 subscribed = true; 144 subscribed = true;
@@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr @@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
124 publisherMap.put(topic, publisher); 124 publisherMap.put(topic, publisher);
125 return publisher; 125 return publisher;
126 } catch (IOException e) { 126 } catch (IOException e) {
127 - log.error("Failed to create topic [{}].", topic, e);  
128 - throw new RuntimeException("Failed to create topic.", e); 127 + log.error("Failed to create Publisher for the topic [{}].", topic, e);
  128 + throw new RuntimeException("Failed to create Publisher for the topic.", e);
129 } 129 }
130 } 130 }
131 131
@@ -60,11 +60,9 @@ function PubSubProducer() { @@ -60,11 +60,9 @@ function PubSubProducer() {
60 const topicList = await pubSubClient.getTopics(); 60 const topicList = await pubSubClient.getTopics();
61 61
62 if (topicList) { 62 if (topicList) {
63 - if (topicList) {  
64 - topicList[0].forEach(topic => {  
65 - topics.push(getName(topic.name));  
66 - });  
67 - } 63 + topicList[0].forEach(topic => {
  64 + topics.push(getName(topic.name));
  65 + });
68 } 66 }
69 67
70 const subscriptionList = await pubSubClient.getSubscriptions(); 68 const subscriptionList = await pubSubClient.getSubscriptions();
@@ -100,23 +98,32 @@ function PubSubProducer() { @@ -100,23 +98,32 @@ function PubSubProducer() {
100 98
101 async function createTopic(topic) { 99 async function createTopic(topic) {
102 if (!topics.includes(topic)) { 100 if (!topics.includes(topic)) {
103 - await pubSubClient.createTopic(topic); 101 + try {
  102 + await pubSubClient.createTopic(topic);
  103 + logger.info('Created new Pub/Sub topic: %s', topic);
  104 + } catch (e) {
  105 + logger.info('Pub/Sub topic already exists');
  106 + }
104 topics.push(topic); 107 topics.push(topic);
105 - logger.info('Created new Pub/Sub topic: %s', topic);  
106 } 108 }
107 await createSubscription(topic) 109 await createSubscription(topic)
108 } 110 }
109 111
110 async function createSubscription(topic) { 112 async function createSubscription(topic) {
111 if (!subscriptions.includes(topic)) { 113 if (!subscriptions.includes(topic)) {
112 - await pubSubClient.createSubscription(topic, topic, {  
113 - topic: topic,  
114 - subscription: topic,  
115 - ackDeadlineSeconds: queueProps['ackDeadlineInSec'],  
116 - messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}  
117 - }); 114 + try {
  115 + await pubSubClient.createSubscription(topic, topic, {
  116 + topic: topic,
  117 + subscription: topic,
  118 + ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
  119 + messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
  120 + });
  121 + logger.info('Created new Pub/Sub subscription: %s', topic);
  122 + } catch (e) {
  123 + logger.info('Pub/Sub subscription already exists.');
  124 + }
  125 +
118 subscriptions.push(topic); 126 subscriptions.push(topic);
119 - logger.info('Created new Pub/Sub subscription: %s', topic);  
120 } 127 }
121 } 128 }
122 129
@@ -95,7 +95,7 @@ @@ -95,7 +95,7 @@
95 <snakeyaml.version>1.25</snakeyaml.version> 95 <snakeyaml.version>1.25</snakeyaml.version>
96 <struts.version>1.3.10</struts.version> 96 <struts.version>1.3.10</struts.version>
97 <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version> 97 <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
98 - <pubsub.client.version>1.84.0</pubsub.client.version> 98 + <pubsub.client.version>1.105.0</pubsub.client.version>
99 <azure-servicebus.version>3.2.0</azure-servicebus.version> 99 <azure-servicebus.version>3.2.0</azure-servicebus.version>
100 <passay.version>1.5.0</passay.version> 100 <passay.version>1.5.0</passay.version>
101 <ua-parser.version>1.4.3</ua-parser.version> 101 <ua-parser.version>1.4.3</ua-parser.version>