Commit d866fc1503133c3a67692993e256c03cbc6fb87f
Merge remote-tracking branch 'origin/master' into develop/3.0
Showing
3 changed files
with
4 additions
and
1 deletions
@@ -3,7 +3,7 @@ | @@ -3,7 +3,7 @@ | ||
3 | "additionalInfo": null, | 3 | "additionalInfo": null, |
4 | "name": "Root Rule Chain", | 4 | "name": "Root Rule Chain", |
5 | "firstRuleNodeId": null, | 5 | "firstRuleNodeId": null, |
6 | - "root": false, | 6 | + "root": true, |
7 | "debugMode": false, | 7 | "debugMode": false, |
8 | "configuration": null | 8 | "configuration": null |
9 | }, | 9 | }, |
@@ -106,6 +106,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon | @@ -106,6 +106,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon | ||
106 | if (!subscribed) { | 106 | if (!subscribed) { |
107 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); | 107 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); |
108 | topicNames.forEach(admin::createTopicIfNotExists); | 108 | topicNames.forEach(admin::createTopicIfNotExists); |
109 | + consumer.unsubscribe(); | ||
109 | consumer.subscribe(topicNames); | 110 | consumer.subscribe(topicNames); |
110 | subscribed = true; | 111 | subscribed = true; |
111 | } | 112 | } |