Commit 702a8b6139e2a2ea16c42f4301b9936fabc82e5b
1 parent
9b532e18
Force unsubscribe from Kafka topics
Showing
2 changed files
with
3 additions
and
0 deletions
@@ -106,6 +106,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon | @@ -106,6 +106,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon | ||
106 | if (!subscribed) { | 106 | if (!subscribed) { |
107 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); | 107 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); |
108 | topicNames.forEach(admin::createTopicIfNotExists); | 108 | topicNames.forEach(admin::createTopicIfNotExists); |
109 | + consumer.unsubscribe(); | ||
109 | consumer.subscribe(topicNames); | 110 | consumer.subscribe(topicNames); |
110 | subscribed = true; | 111 | subscribed = true; |
111 | } | 112 | } |