Commit e102b55bc139deff72c554f8e7f6e7c9f0c029a4

Authored by Andrii Shvaika
1 parent 7203e7aa

Thread-safety usage of Kafka Consumer

@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -293,12 +293,10 @@ public class DefaultDeviceStateService implements DeviceStateService { @@ -293,12 +293,10 @@ public class DefaultDeviceStateService implements DeviceStateService {
293 callback.onSuccess(); 293 callback.onSuccess();
294 } 294 }
295 } 295 }
296 -  
297 } catch (Exception e) { 296 } catch (Exception e) {
298 log.trace("Failed to process queue msg: [{}]", proto, e); 297 log.trace("Failed to process queue msg: [{}]", proto, e);
299 callback.onFailure(e); 298 callback.onFailure(e);
300 } 299 }
301 -  
302 } 300 }
303 301
304 @Override 302 @Override
@@ -366,6 +364,10 @@ public class DefaultDeviceStateService implements DeviceStateService { @@ -366,6 +364,10 @@ public class DefaultDeviceStateService implements DeviceStateService {
366 } 364 }
367 } 365 }
368 } 366 }
  367 + log.info("Managing following partitions:");
  368 + partitionedDevices.forEach((tpi, devices) -> {
  369 + log.info("[{}]: {} devices", tpi.getFullTopicName(), devices.size());
  370 + });
369 } catch (Throwable t) { 371 } catch (Throwable t) {
370 log.warn("Failed to init device states from DB", t); 372 log.warn("Failed to init device states from DB", t);
371 } 373 }
@@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at 6 * You may obtain a copy of the License at
7 * 7 *
8 - * http://www.apache.org/licenses/LICENSE-2.0 8 + * http://www.apache.org/licenses/LICENSE-2.0
9 * 9 *
10 * Unless required by applicable law or agreed to in writing, software 10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, 11 * distributed under the License is distributed on an "AS IS" BASIS,
@@ -45,6 +45,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -45,6 +45,7 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
45 private final KafkaConsumer<String, byte[]> consumer; 45 private final KafkaConsumer<String, byte[]> consumer;
46 private final TbKafkaDecoder<T> decoder; 46 private final TbKafkaDecoder<T> decoder;
47 private volatile boolean subscribed; 47 private volatile boolean subscribed;
  48 + private volatile Set<TopicPartitionInfo> partitions;
48 49
49 @Getter 50 @Getter
50 private final String topic; 51 private final String topic;
@@ -74,29 +75,31 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -74,29 +75,31 @@ public class TBKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
74 75
75 @Override 76 @Override
76 public void subscribe() { 77 public void subscribe() {
77 - createTopicIfNotExists(topic);  
78 - consumer.subscribe(Collections.singletonList(topic));  
79 - subscribed = true; 78 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null));
  79 + subscribed = false;
80 } 80 }
81 81
82 @Override 82 @Override
83 public void subscribe(Set<TopicPartitionInfo> partitions) { 83 public void subscribe(Set<TopicPartitionInfo> partitions) {
84 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());  
85 - topicNames.forEach(this::createTopicIfNotExists);  
86 - consumer.unsubscribe();  
87 - consumer.subscribe(topicNames);  
88 - subscribed = true; 84 + this.partitions = partitions;
  85 + subscribed = false;
89 } 86 }
90 87
91 @Override 88 @Override
92 public List<T> poll(long durationInMillis) { 89 public List<T> poll(long durationInMillis) {
93 - if (!subscribed) { 90 + if (!subscribed && partitions == null) {
94 try { 91 try {
95 Thread.sleep(durationInMillis); 92 Thread.sleep(durationInMillis);
96 } catch (InterruptedException e) { 93 } catch (InterruptedException e) {
97 log.debug("Failed to await subscription", e); 94 log.debug("Failed to await subscription", e);
98 } 95 }
99 } else { 96 } else {
  97 + if (!subscribed) {
  98 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  99 + topicNames.forEach(this::createTopicIfNotExists);
  100 + consumer.subscribe(topicNames);
  101 + subscribed = true;
  102 + }
100 ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis)); 103 ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
101 if (records.count() > 0) { 104 if (records.count() > 0) {
102 List<T> result = new ArrayList<>(); 105 List<T> result = new ArrayList<>();