Commit c7f282d39385473928f4111d015bc02ce30b07bb

Authored by Andrii Shvaika
1 parent 3d54384b

Refactoring of the Queue Consumers

  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  21 +import org.thingsboard.server.queue.TbQueueConsumer;
  22 +import org.thingsboard.server.queue.TbQueueMsg;
  23 +
  24 +import java.io.IOException;
  25 +import java.util.ArrayList;
  26 +import java.util.Collections;
  27 +import java.util.List;
  28 +import java.util.Set;
  29 +import java.util.concurrent.locks.Lock;
  30 +import java.util.concurrent.locks.ReentrantLock;
  31 +
  32 +@Slf4j
  33 +public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
  34 +
  35 + private volatile boolean subscribed;
  36 + protected volatile Set<TopicPartitionInfo> partitions;
  37 + protected final Lock consumerLock = new ReentrantLock();
  38 +
  39 + @Getter
  40 + private final String topic;
  41 +
  42 + public AbstractTbQueueConsumerTemplate(String topic) {
  43 + this.topic = topic;
  44 + }
  45 +
  46 + @Override
  47 + public void subscribe() {
  48 + consumerLock.lock();
  49 + try {
  50 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  51 + subscribed = false;
  52 + } finally {
  53 + consumerLock.unlock();
  54 + }
  55 + }
  56 +
  57 + @Override
  58 + public void subscribe(Set<TopicPartitionInfo> partitions) {
  59 + consumerLock.lock();
  60 + try {
  61 + this.partitions = partitions;
  62 + subscribed = false;
  63 + } finally {
  64 + consumerLock.unlock();
  65 + }
  66 + }
  67 +
  68 + @Override
  69 + public List<T> poll(long durationInMillis) {
  70 + if (!subscribed && partitions == null) {
  71 + try {
  72 + Thread.sleep(durationInMillis);
  73 + } catch (InterruptedException e) {
  74 + log.debug("Failed to await subscription", e);
  75 + }
  76 + } else {
  77 + consumerLock.lock();
  78 + try {
  79 + if (!subscribed) {
  80 + doSubscribe();
  81 + subscribed = true;
  82 + }
  83 +
  84 + List<R> records = doPoll(durationInMillis);
  85 + if (!records.isEmpty()) {
  86 + List<T> result = new ArrayList<>(records.size());
  87 + records.forEach(record -> {
  88 + try {
  89 + if (record != null) {
  90 + result.add(decode(record));
  91 + }
  92 + } catch (IOException e) {
  93 + log.error("Failed decode record: [{}]", record);
  94 + throw new RuntimeException("Failed to decode record: ", e);
  95 + }
  96 + });
  97 + return result;
  98 + }
  99 + } finally {
  100 + consumerLock.unlock();
  101 + }
  102 + }
  103 + return Collections.emptyList();
  104 + }
  105 +
  106 + @Override
  107 + public void commit() {
  108 + consumerLock.lock();
  109 + try {
  110 + doCommit();
  111 + } finally {
  112 + consumerLock.unlock();
  113 + }
  114 + }
  115 +
  116 + @Override
  117 + public void unsubscribe() {
  118 + consumerLock.lock();
  119 + try {
  120 + doUnsubscribe();
  121 + } finally {
  122 + consumerLock.unlock();
  123 + }
  124 + }
  125 +
  126 + abstract protected List<R> doPoll(long durationInMillis);
  127 +
  128 + abstract protected T decode(R record) throws IOException;
  129 +
  130 + abstract protected void doSubscribe();
  131 +
  132 + abstract protected void doCommit();
  133 +
  134 + abstract protected void doUnsubscribe();
  135 +
  136 +}
@@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
16 package org.thingsboard.server.queue.kafka; 16 package org.thingsboard.server.queue.kafka;
17 17
18 import lombok.Builder; 18 import lombok.Builder;
19 -import lombok.Getter;  
20 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
21 import org.apache.kafka.clients.consumer.ConsumerConfig; 20 import org.apache.kafka.clients.consumer.ConsumerConfig;
22 import org.apache.kafka.clients.consumer.ConsumerRecord; 21 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -24,8 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -24,8 +23,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
24 import org.apache.kafka.clients.consumer.KafkaConsumer; 23 import org.apache.kafka.clients.consumer.KafkaConsumer;
25 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
26 import org.thingsboard.server.queue.TbQueueAdmin; 25 import org.thingsboard.server.queue.TbQueueAdmin;
27 -import org.thingsboard.server.queue.TbQueueConsumer;  
28 import org.thingsboard.server.queue.TbQueueMsg; 26 import org.thingsboard.server.queue.TbQueueMsg;
  27 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 28
30 import java.io.IOException; 29 import java.io.IOException;
31 import java.time.Duration; 30 import java.time.Duration;
@@ -33,26 +32,17 @@ import java.util.ArrayList; @@ -33,26 +32,17 @@ import java.util.ArrayList;
33 import java.util.Collections; 32 import java.util.Collections;
34 import java.util.List; 33 import java.util.List;
35 import java.util.Properties; 34 import java.util.Properties;
36 -import java.util.Set;  
37 -import java.util.concurrent.locks.Lock;  
38 -import java.util.concurrent.locks.ReentrantLock;  
39 import java.util.stream.Collectors; 35 import java.util.stream.Collectors;
40 36
41 /** 37 /**
42 * Created by ashvayka on 24.09.18. 38 * Created by ashvayka on 24.09.18.
43 */ 39 */
44 @Slf4j 40 @Slf4j
45 -public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 41 +public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
46 42
47 private final TbQueueAdmin admin; 43 private final TbQueueAdmin admin;
48 private final KafkaConsumer<String, byte[]> consumer; 44 private final KafkaConsumer<String, byte[]> consumer;
49 private final TbKafkaDecoder<T> decoder; 45 private final TbKafkaDecoder<T> decoder;
50 - private volatile boolean subscribed;  
51 - private volatile Set<TopicPartitionInfo> partitions;  
52 - private final Lock consumerLock;  
53 -  
54 - @Getter  
55 - private final String topic;  
56 46
57 @Builder 47 @Builder
58 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, 48 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
@@ -60,6 +50,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -60,6 +50,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
60 boolean autoCommit, int autoCommitIntervalMs, 50 boolean autoCommit, int autoCommitIntervalMs,
61 int maxPollRecords, 51 int maxPollRecords,
62 TbQueueAdmin admin) { 52 TbQueueAdmin admin) {
  53 + super(topic);
63 Properties props = settings.toProps(); 54 Properties props = settings.toProps();
64 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 55 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
65 if (groupId != null) { 56 if (groupId != null) {
@@ -75,94 +66,43 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -75,94 +66,43 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
75 this.admin = admin; 66 this.admin = admin;
76 this.consumer = new KafkaConsumer<>(props); 67 this.consumer = new KafkaConsumer<>(props);
77 this.decoder = decoder; 68 this.decoder = decoder;
78 - this.topic = topic;  
79 - this.consumerLock = new ReentrantLock();  
80 } 69 }
81 70
82 @Override 71 @Override
83 - public void subscribe() {  
84 - consumerLock.lock();  
85 - try {  
86 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
87 - subscribed = false;  
88 - } finally {  
89 - consumerLock.unlock();  
90 - } 72 + protected void doSubscribe() {
  73 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  74 + topicNames.forEach(admin::createTopicIfNotExists);
  75 + consumer.subscribe(topicNames);
91 } 76 }
92 77
93 @Override 78 @Override
94 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
95 - consumerLock.lock();  
96 - try {  
97 - this.partitions = partitions;  
98 - subscribed = false;  
99 - } finally {  
100 - consumerLock.unlock(); 79 + protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
  80 + ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
  81 + if (records.isEmpty()) {
  82 + return Collections.emptyList();
  83 + } else {
  84 + List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
  85 + records.forEach(recordList::add);
  86 + return recordList;
101 } 87 }
102 } 88 }
103 89
104 @Override 90 @Override
105 - public List<T> poll(long durationInMillis) {  
106 - if (!subscribed && partitions == null) {  
107 - try {  
108 - Thread.sleep(durationInMillis);  
109 - } catch (InterruptedException e) {  
110 - log.debug("Failed to await subscription", e);  
111 - }  
112 - } else {  
113 - consumerLock.lock();  
114 - try {  
115 - if (!subscribed) {  
116 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());  
117 - topicNames.forEach(admin::createTopicIfNotExists);  
118 - consumer.subscribe(topicNames);  
119 - subscribed = true;  
120 - }  
121 -  
122 - ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));  
123 - if (records.count() > 0) {  
124 - List<T> result = new ArrayList<>();  
125 - records.forEach(record -> {  
126 - try {  
127 - result.add(decode(record));  
128 - } catch (IOException e) {  
129 - log.error("Failed decode record: [{}]", record);  
130 - }  
131 - });  
132 - return result;  
133 - }  
134 - } finally {  
135 - consumerLock.unlock();  
136 - }  
137 - }  
138 - return Collections.emptyList(); 91 + public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
  92 + return decoder.decode(new KafkaTbQueueMsg(record));
139 } 93 }
140 94
141 @Override 95 @Override
142 - public void commit() {  
143 - consumerLock.lock();  
144 - try {  
145 - consumer.commitAsync();  
146 - } finally {  
147 - consumerLock.unlock();  
148 - } 96 + protected void doCommit() {
  97 + consumer.commitAsync();
149 } 98 }
150 99
151 @Override 100 @Override
152 - public void unsubscribe() {  
153 - consumerLock.lock();  
154 - try {  
155 - if (consumer != null) {  
156 - consumer.unsubscribe();  
157 - consumer.close();  
158 - }  
159 - } finally {  
160 - consumerLock.unlock(); 101 + protected void doUnsubscribe() {
  102 + if (consumer != null) {
  103 + consumer.unsubscribe();
  104 + consumer.close();
161 } 105 }
162 } 106 }
163 107
164 - public T decode(ConsumerRecord<String, byte[]> record) throws IOException {  
165 - return decoder.decode(new KafkaTbQueueMsg(record));  
166 - }  
167 -  
168 } 108 }