Commit 09d8823205e5d517cec66ff3bd4a84e9d491ee60

Authored by Andrii Shvaika
1 parent 702a8b61

Race condition fix

... ... @@ -81,14 +81,24 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
81 81
82 82 @Override
83 83 public void subscribe() {
84   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
85   - subscribed = false;
  84 + consumerLock.lock();
  85 + try {
  86 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  87 + subscribed = false;
  88 + } finally {
  89 + consumerLock.unlock();
  90 + }
86 91 }
87 92
88 93 @Override
89 94 public void subscribe(Set<TopicPartitionInfo> partitions) {
90   - this.partitions = partitions;
91   - subscribed = false;
  95 + consumerLock.lock();
  96 + try {
  97 + this.partitions = partitions;
  98 + subscribed = false;
  99 + } finally {
  100 + consumerLock.unlock();
  101 + }
92 102 }
93 103
94 104 @Override
... ... @@ -100,13 +110,11 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
100 110 log.debug("Failed to await subscription", e);
101 111 }
102 112 } else {
  113 + consumerLock.lock();
103 114 try {
104   - consumerLock.lock();
105   -
106 115 if (!subscribed) {
107 116 List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
108 117 topicNames.forEach(admin::createTopicIfNotExists);
109   - consumer.unsubscribe();
110 118 consumer.subscribe(topicNames);
111 119 subscribed = true;
112 120 }
... ... @@ -132,8 +140,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
132 140
133 141 @Override
134 142 public void commit() {
  143 + consumerLock.lock();
135 144 try {
136   - consumerLock.lock();
137 145 consumer.commitAsync();
138 146 } finally {
139 147 consumerLock.unlock();
... ... @@ -142,8 +150,8 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
142 150
143 151 @Override
144 152 public void unsubscribe() {
  153 + consumerLock.lock();
145 154 try {
146   - consumerLock.lock();
147 155 if (consumer != null) {
148 156 consumer.unsubscribe();
149 157 consumer.close();
... ...
... ... @@ -28,7 +28,7 @@ services:
28 28 ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181
29 29 kafka:
30 30 restart: always
31   - image: "wurstmeister/kafka:2.12-2.2.1"
  31 + image: "wurstmeister/kafka:2.12-2.3.0"
32 32 ports:
33 33 - "9092:9092"
34 34 env_file:
... ...