Commit 8d5c38b743a91c2ad3739c25c47f93ece5480f08

Authored by Andrii Shvaika
1 parent c7f282d3

Queue refactoring

Showing 14 changed files with 232 additions and 350 deletions
@@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
23 import org.thingsboard.server.actors.ActorSystemContext; 23 import org.thingsboard.server.actors.ActorSystemContext;
24 import org.thingsboard.server.common.msg.queue.ServiceType; 24 import org.thingsboard.server.common.msg.queue.ServiceType;
25 import org.thingsboard.server.common.msg.queue.TbCallback; 25 import org.thingsboard.server.common.msg.queue.TbCallback;
26 -import org.thingsboard.server.gen.transport.TransportProtos;  
27 import org.thingsboard.server.queue.TbQueueConsumer; 26 import org.thingsboard.server.queue.TbQueueConsumer;
28 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 27 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 28 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap; @@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
23 import java.util.Map; 23 import java.util.Map;
24 import java.util.UUID; 24 import java.util.UUID;
25 import java.util.concurrent.ConcurrentMap; 25 import java.util.concurrent.ConcurrentMap;
26 -import java.util.concurrent.ExecutorService;  
27 import java.util.concurrent.atomic.AtomicInteger; 26 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.BiConsumer; 27 import java.util.function.BiConsumer;
29 28
@@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
77 } 76 }
78 } 77 }
79 int submitSize = pendingPack.size(); 78 int submitSize = pendingPack.size();
80 - if (log.isInfoEnabled() && submitSize > 0) {  
81 - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); 79 + if (log.isDebugEnabled() && submitSize > 0) {
  80 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
82 } 81 }
83 pendingPack.forEach(msgConsumer); 82 pendingPack.forEach(msgConsumer);
84 } 83 }
@@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j; @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.gen.transport.TransportProtos; 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22 -import java.util.ArrayList;  
23 -import java.util.List;  
24 import java.util.UUID; 22 import java.util.UUID;
25 -import java.util.concurrent.ConcurrentMap;  
26 -import java.util.concurrent.ExecutorService;  
27 import java.util.function.BiConsumer; 23 import java.util.function.BiConsumer;
28 -import java.util.function.Function;  
29 -import java.util.stream.Collectors;  
30 24
31 @Slf4j 25 @Slf4j
32 public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { 26 public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
@@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
37 31
38 @Override 32 @Override
39 public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) { 33 public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
40 - if (log.isInfoEnabled()) {  
41 - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); 34 + if (log.isDebugEnabled()) {
  35 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
42 } 36 }
43 orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); 37 orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg));
44 } 38 }
@@ -15,26 +15,18 @@ @@ -15,26 +15,18 @@
15 */ 15 */
16 package org.thingsboard.server.service.queue.processing; 16 package org.thingsboard.server.service.queue.processing;
17 17
18 -import com.google.protobuf.InvalidProtocolBufferException;  
19 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
20 import org.thingsboard.server.common.data.id.EntityId; 19 import org.thingsboard.server.common.data.id.EntityId;
21 -import org.thingsboard.server.common.data.id.EntityIdFactory;  
22 -import org.thingsboard.server.common.msg.TbMsg;  
23 -import org.thingsboard.server.common.msg.gen.MsgProtos;  
24 -import org.thingsboard.server.common.msg.queue.TbMsgCallback;  
25 import org.thingsboard.server.gen.transport.TransportProtos; 20 import org.thingsboard.server.gen.transport.TransportProtos;
26 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 21 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
27 22
28 -import java.util.ArrayList;  
29 import java.util.LinkedList; 23 import java.util.LinkedList;
30 import java.util.List; 24 import java.util.List;
31 import java.util.Queue; 25 import java.util.Queue;
32 import java.util.UUID; 26 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap; 27 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap; 28 import java.util.concurrent.ConcurrentMap;
35 -import java.util.concurrent.atomic.AtomicInteger;  
36 import java.util.function.BiConsumer; 29 import java.util.function.BiConsumer;
37 -import java.util.stream.Collectors;  
38 30
39 @Slf4j 31 @Slf4j
40 public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { 32 public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
@@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy
30 @Override 30 @Override
31 protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { 31 protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
32 return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); 32 return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
33 -  
34 } 33 }
35 } 34 }
@@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j; @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.gen.transport.TransportProtos; 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22 -import java.util.LinkedHashMap;  
23 -import java.util.Map;  
24 import java.util.UUID; 22 import java.util.UUID;
25 import java.util.concurrent.ConcurrentMap; 23 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.atomic.AtomicInteger; 24 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu
63 if (idx < listSize) { 61 if (idx < listSize) {
64 IdMsgPair pair = orderedMsgList.get(idx); 62 IdMsgPair pair = orderedMsgList.get(idx);
65 expectedMsgId = pair.uuid; 63 expectedMsgId = pair.uuid;
66 - if (log.isInfoEnabled()) {  
67 - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); 64 + if (log.isDebugEnabled()) {
  65 + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
68 } 66 }
69 msgConsumer.accept(pair.uuid, pair.msg); 67 msgConsumer.accept(pair.uuid, pair.msg);
70 } 68 }
@@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory {
82 retryCount++; 82 retryCount++;
83 double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); 83 double failedCount = result.getFailedMap().size() + result.getPendingMap().size();
84 if (maxRetries > 0 && retryCount > maxRetries) { 84 if (maxRetries > 0 && retryCount > maxRetries) {
85 - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); 85 + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
86 return new TbRuleEngineProcessingDecision(true, null); 86 return new TbRuleEngineProcessingDecision(true, null);
87 } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { 87 } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) {
88 - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); 88 + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
89 return new TbRuleEngineProcessingDecision(true, null); 89 return new TbRuleEngineProcessingDecision(true, null);
90 } else { 90 } else {
91 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount); 91 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
@@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory {
98 if (retrySuccessful) { 98 if (retrySuccessful) {
99 result.getSuccessMap().forEach(toReprocess::put); 99 result.getSuccessMap().forEach(toReprocess::put);
100 } 100 }
101 - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); 101 + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
102 if (log.isTraceEnabled()) { 102 if (log.isTraceEnabled()) {
103 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); 103 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
104 } 104 }
@@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory {
126 @Override 126 @Override
127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { 127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
128 if (!result.isSuccess()) { 128 if (!result.isSuccess()) {
129 - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); 129 + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
130 } 130 }
131 if (log.isTraceEnabled()) { 131 if (log.isTraceEnabled()) {
132 result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); 132 result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
@@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
31 import org.springframework.util.CollectionUtils; 31 import org.springframework.util.CollectionUtils;
32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
33 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
34 -import org.thingsboard.server.queue.TbQueueConsumer;  
35 import org.thingsboard.server.queue.TbQueueMsg; 34 import org.thingsboard.server.queue.TbQueueMsg;
36 import org.thingsboard.server.queue.TbQueueMsgDecoder; 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
38 38
39 import java.time.Duration; 39 import java.time.Duration;
@@ -50,102 +50,72 @@ import java.util.stream.Collectors; @@ -50,102 +50,72 @@ import java.util.stream.Collectors;
50 import java.util.stream.Stream; 50 import java.util.stream.Stream;
51 51
52 @Slf4j 52 @Slf4j
53 -public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 53 +public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
54 private final TbQueueAdmin admin; 54 private final TbQueueAdmin admin;
55 - private final String topic;  
56 private final TbQueueMsgDecoder<T> decoder; 55 private final TbQueueMsgDecoder<T> decoder;
57 private final TbServiceBusSettings serviceBusSettings; 56 private final TbServiceBusSettings serviceBusSettings;
58 57
59 private final Gson gson = new Gson(); 58 private final Gson gson = new Gson();
60 59
61 private Set<CoreMessageReceiver> receivers; 60 private Set<CoreMessageReceiver> receivers;
62 - private volatile Set<TopicPartitionInfo> partitions;  
63 - private volatile boolean subscribed;  
64 - private volatile boolean stopped = false;  
65 private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>(); 61 private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
66 private volatile int messagesPerQueue; 62 private volatile int messagesPerQueue;
67 63
68 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) { 64 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
69 this.admin = admin; 66 this.admin = admin;
70 this.decoder = decoder; 67 this.decoder = decoder;
71 - this.topic = topic;  
72 this.serviceBusSettings = serviceBusSettings; 68 this.serviceBusSettings = serviceBusSettings;
73 } 69 }
74 70
75 @Override 71 @Override
76 - public String getTopic() {  
77 - return topic; 72 + protected List<MessageWithDeliveryTag> doPoll(long durationInMillis) {
  73 + List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
  74 + receivers.stream()
  75 + .map(receiver -> receiver
  76 + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
  77 + .whenComplete((messages, err) -> {
  78 + if (!CollectionUtils.isEmpty(messages)) {
  79 + pendingMessages.put(receiver, messages);
  80 + } else if (err != null) {
  81 + log.error("Failed to receive messages.", err);
  82 + }
  83 + }))
  84 + .collect(Collectors.toList());
  85 + try {
  86 + return fromList(messageFutures)
  87 + .get()
  88 + .stream()
  89 + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
  90 + .collect(Collectors.toList());
  91 + } catch (InterruptedException | ExecutionException e) {
  92 + if (stopped) {
  93 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
  94 + } else {
  95 + log.error("Failed to receive messages", e);
  96 + }
  97 + return Collections.emptyList();
  98 + }
78 } 99 }
79 100
80 @Override 101 @Override
81 - public void subscribe() {  
82 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
83 - subscribed = false; 102 + protected void doSubscribe(List<String> topicNames) {
  103 + createReceivers();
  104 + messagesPerQueue = receivers.size() / partitions.size();
84 } 105 }
85 106
86 @Override 107 @Override
87 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
88 - this.partitions = partitions;  
89 - subscribed = false; 108 + protected void doCommit() {
  109 + pendingMessages.forEach((receiver, msgs) ->
  110 + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
  111 + pendingMessages.clear();
90 } 112 }
91 113
92 @Override 114 @Override
93 - public void unsubscribe() {  
94 - stopped = true; 115 + protected void doUnsubscribe() {
95 receivers.forEach(CoreMessageReceiver::closeAsync); 116 receivers.forEach(CoreMessageReceiver::closeAsync);
96 } 117 }
97 118
98 - @Override  
99 - public List<T> poll(long durationInMillis) {  
100 - if (!subscribed && partitions == null) {  
101 - try {  
102 - Thread.sleep(durationInMillis);  
103 - } catch (InterruptedException e) {  
104 - log.debug("Failed to await subscription", e);  
105 - }  
106 - } else {  
107 - if (!subscribed) {  
108 - createReceivers();  
109 - messagesPerQueue = receivers.size() / partitions.size();  
110 - subscribed = true;  
111 - }  
112 -  
113 - List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =  
114 - receivers.stream()  
115 - .map(receiver -> receiver  
116 - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))  
117 - .whenComplete((messages, err) -> {  
118 - if (!CollectionUtils.isEmpty(messages)) {  
119 - pendingMessages.put(receiver, messages);  
120 - } else if (err != null) {  
121 - log.error("Failed to receive messages.", err);  
122 - }  
123 - }))  
124 - .collect(Collectors.toList());  
125 - try {  
126 - return fromList(messageFutures)  
127 - .get()  
128 - .stream()  
129 - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())  
130 - .map(message -> {  
131 - try {  
132 - return decode(message);  
133 - } catch (InvalidProtocolBufferException e) {  
134 - log.error("Failed to parse message.", e);  
135 - throw new RuntimeException("Failed to parse message.", e);  
136 - }  
137 - }).collect(Collectors.toList());  
138 - } catch (InterruptedException | ExecutionException e) {  
139 - if (stopped) {  
140 - log.info("[{}] Service Bus consumer is stopped.", topic);  
141 - } else {  
142 - log.error("Failed to receive messages", e);  
143 - }  
144 - }  
145 - }  
146 - return Collections.emptyList();  
147 - }  
148 -  
149 private void createReceivers() { 119 private void createReceivers() {
150 List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream() 120 List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream()
151 .map(TopicPartitionInfo::getFullTopicName) 121 .map(TopicPartitionInfo::getFullTopicName)
@@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
167 receivers = new HashSet<>(fromList(receiverFutures).get()); 137 receivers = new HashSet<>(fromList(receiverFutures).get());
168 } catch (InterruptedException | ExecutionException e) { 138 } catch (InterruptedException | ExecutionException e) {
169 if (stopped) { 139 if (stopped) {
170 - log.info("[{}] Service Bus consumer is stopped.", topic); 140 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
171 } else { 141 } else {
172 log.error("Failed to create receivers", e); 142 log.error("Failed to create receivers", e);
173 } 143 }
@@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
196 } 166 }
197 167
198 @Override 168 @Override
199 - public void commit() {  
200 - pendingMessages.forEach((receiver, msgs) ->  
201 - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));  
202 - pendingMessages.clear();  
203 - }  
204 -  
205 - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { 169 + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
206 DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); 170 DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
207 return decoder.decode(msg); 171 return decoder.decode(msg);
208 } 172 }
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import com.google.common.util.concurrent.ListeningExecutorService;
  19 +import com.google.common.util.concurrent.MoreExecutors;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.queue.TbQueueMsg;
  22 +
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +
  26 +@Slf4j
  27 +public abstract class AbstractParallelTbQueueConsumerTemplate<R, T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<R, T> {
  28 +
  29 + protected ListeningExecutorService consumerExecutor;
  30 +
  31 + public AbstractParallelTbQueueConsumerTemplate(String topic) {
  32 + super(topic);
  33 + }
  34 +
  35 + protected void initNewExecutor(int threadPoolSize) {
  36 + if (consumerExecutor != null) {
  37 + consumerExecutor.shutdown();
  38 + try {
  39 + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
  40 + } catch (InterruptedException e) {
  41 + log.trace("Interrupted while waiting for consumer executor to stop");
  42 + }
  43 + }
  44 + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
  45 + }
  46 +
  47 + protected void shutdownExecutor() {
  48 + if (consumerExecutor != null) {
  49 + consumerExecutor.shutdownNow();
  50 + }
  51 + }
  52 +
  53 +}
@@ -28,11 +28,13 @@ import java.util.List; @@ -28,11 +28,13 @@ import java.util.List;
28 import java.util.Set; 28 import java.util.Set;
29 import java.util.concurrent.locks.Lock; 29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock; 30 import java.util.concurrent.locks.ReentrantLock;
  31 +import java.util.stream.Collectors;
31 32
32 @Slf4j 33 @Slf4j
33 public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> { 34 public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
34 35
35 private volatile boolean subscribed; 36 private volatile boolean subscribed;
  37 + protected volatile boolean stopped = false;
36 protected volatile Set<TopicPartitionInfo> partitions; 38 protected volatile Set<TopicPartitionInfo> partitions;
37 protected final Lock consumerLock = new ReentrantLock(); 39 protected final Lock consumerLock = new ReentrantLock();
38 40
@@ -74,10 +76,12 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i @@ -74,10 +76,12 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
74 log.debug("Failed to await subscription", e); 76 log.debug("Failed to await subscription", e);
75 } 77 }
76 } else { 78 } else {
  79 + long pollStartTs = System.currentTimeMillis();
77 consumerLock.lock(); 80 consumerLock.lock();
78 try { 81 try {
79 if (!subscribed) { 82 if (!subscribed) {
80 - doSubscribe(); 83 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  84 + doSubscribe(topicNames);
81 subscribed = true; 85 subscribed = true;
82 } 86 }
83 87
@@ -95,6 +99,17 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i @@ -95,6 +99,17 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
95 } 99 }
96 }); 100 });
97 return result; 101 return result;
  102 + } else {
  103 + long pollDuration = System.currentTimeMillis() - pollStartTs;
  104 + if (pollDuration < durationInMillis) {
  105 + try {
  106 + Thread.sleep(durationInMillis - pollDuration);
  107 + } catch (InterruptedException e) {
  108 + if (!stopped) {
  109 + log.error("Failed to wait.", e);
  110 + }
  111 + }
  112 + }
98 } 113 }
99 } finally { 114 } finally {
100 consumerLock.unlock(); 115 consumerLock.unlock();
@@ -115,6 +130,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i @@ -115,6 +130,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
115 130
116 @Override 131 @Override
117 public void unsubscribe() { 132 public void unsubscribe() {
  133 + stopped = true;
118 consumerLock.lock(); 134 consumerLock.lock();
119 try { 135 try {
120 doUnsubscribe(); 136 doUnsubscribe();
@@ -127,7 +143,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i @@ -127,7 +143,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
127 143
128 abstract protected T decode(R record) throws IOException; 144 abstract protected T decode(R record) throws IOException;
129 145
130 - abstract protected void doSubscribe(); 146 + abstract protected void doSubscribe(List<String> topicNames);
131 147
132 abstract protected void doCommit(); 148 abstract protected void doCommit();
133 149
@@ -69,8 +69,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue @@ -69,8 +69,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
69 } 69 }
70 70
71 @Override 71 @Override
72 - protected void doSubscribe() {  
73 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); 72 + protected void doSubscribe( List<String> topicNames) {
74 topicNames.forEach(admin::createTopicIfNotExists); 73 topicNames.forEach(admin::createTopicIfNotExists);
75 consumer.subscribe(topicNames); 74 consumer.subscribe(topicNames);
76 } 75 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.queue.pubsub; 16 package org.thingsboard.server.queue.pubsub;
17 17
  18 +import com.amazonaws.services.sqs.model.Message;
18 import com.google.api.core.ApiFuture; 19 import com.google.api.core.ApiFuture;
19 import com.google.api.core.ApiFutures; 20 import com.google.api.core.ApiFutures;
20 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; 21 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
@@ -35,11 +36,14 @@ import org.thingsboard.server.queue.TbQueueAdmin; @@ -35,11 +36,14 @@ import org.thingsboard.server.queue.TbQueueAdmin;
35 import org.thingsboard.server.queue.TbQueueConsumer; 36 import org.thingsboard.server.queue.TbQueueConsumer;
36 import org.thingsboard.server.queue.TbQueueMsg; 37 import org.thingsboard.server.queue.TbQueueMsg;
37 import org.thingsboard.server.queue.TbQueueMsgDecoder; 38 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  39 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
  40 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
38 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 41 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
39 42
40 import java.io.IOException; 43 import java.io.IOException;
41 import java.util.ArrayList; 44 import java.util.ArrayList;
42 import java.util.Collections; 45 import java.util.Collections;
  46 +import java.util.LinkedHashSet;
43 import java.util.List; 47 import java.util.List;
44 import java.util.Objects; 48 import java.util.Objects;
45 import java.util.Set; 49 import java.util.Set;
@@ -47,10 +51,11 @@ import java.util.concurrent.CopyOnWriteArrayList; @@ -47,10 +51,11 @@ import java.util.concurrent.CopyOnWriteArrayList;
47 import java.util.concurrent.ExecutionException; 51 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ExecutorService; 52 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors; 53 import java.util.concurrent.Executors;
  54 +import java.util.concurrent.TimeUnit;
50 import java.util.stream.Collectors; 55 import java.util.stream.Collectors;
51 56
52 @Slf4j 57 @Slf4j
53 -public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 58 +public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
54 59
55 private final Gson gson = new Gson(); 60 private final Gson gson = new Gson();
56 private final TbQueueAdmin admin; 61 private final TbQueueAdmin admin;
@@ -58,23 +63,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -58,23 +63,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
58 private final TbQueueMsgDecoder<T> decoder; 63 private final TbQueueMsgDecoder<T> decoder;
59 private final TbPubSubSettings pubSubSettings; 64 private final TbPubSubSettings pubSubSettings;
60 65
61 - private volatile boolean subscribed;  
62 - private volatile Set<TopicPartitionInfo> partitions;  
63 private volatile Set<String> subscriptionNames; 66 private volatile Set<String> subscriptionNames;
64 private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>(); 67 private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>();
65 68
66 - private ExecutorService consumerExecutor;  
67 private final SubscriberStub subscriber; 69 private final SubscriberStub subscriber;
68 - private volatile boolean stopped;  
69 -  
70 private volatile int messagesPerTopic; 70 private volatile int messagesPerTopic;
71 71
72 public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) { 72 public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  73 + super(topic);
73 this.admin = admin; 74 this.admin = admin;
74 this.pubSubSettings = pubSubSettings; 75 this.pubSubSettings = pubSubSettings;
75 this.topic = topic; 76 this.topic = topic;
76 this.decoder = decoder; 77 this.decoder = decoder;
77 -  
78 try { 78 try {
79 SubscriberStubSettings subscriberStubSettings = 79 SubscriberStubSettings subscriberStubSettings =
80 SubscriberStubSettings.newBuilder() 80 SubscriberStubSettings.newBuilder()
@@ -84,89 +84,50 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -84,89 +84,50 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
84 .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) 84 .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize())
85 .build()) 85 .build())
86 .build(); 86 .build();
87 -  
88 this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); 87 this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
89 } catch (IOException e) { 88 } catch (IOException e) {
90 log.error("Failed to create subscriber.", e); 89 log.error("Failed to create subscriber.", e);
91 throw new RuntimeException("Failed to create subscriber.", e); 90 throw new RuntimeException("Failed to create subscriber.", e);
92 } 91 }
93 - stopped = false;  
94 } 92 }
95 93
96 @Override 94 @Override
97 - public String getTopic() {  
98 - return topic; 95 + protected List<PubsubMessage> doPoll(long durationInMillis) {
  96 + try {
  97 + List<ReceivedMessage> messages = receiveMessages();
  98 + if (!messages.isEmpty()) {
  99 + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
  100 + }
  101 + } catch (ExecutionException | InterruptedException e) {
  102 + if (stopped) {
  103 + log.info("[{}] Pub/Sub consumer is stopped.", topic);
  104 + } else {
  105 + log.error("Failed to receive messages", e);
  106 + }
  107 + }
  108 + return Collections.emptyList();
99 } 109 }
100 110
101 @Override 111 @Override
102 - public void subscribe() {  
103 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
104 - subscribed = false; 112 + protected void doSubscribe(List<String> topicNames) {
  113 + subscriptionNames = new LinkedHashSet<>(topicNames);
  114 + subscriptionNames.forEach(admin::createTopicIfNotExists);
  115 + initNewExecutor(subscriptionNames.size() + 1);
  116 + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
105 } 117 }
106 118
107 @Override 119 @Override
108 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
109 - this.partitions = partitions;  
110 - subscribed = false; 120 + protected void doCommit() {
  121 + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);
  122 + acknowledgeRequests.clear();
111 } 123 }
112 124
113 @Override 125 @Override
114 - public void unsubscribe() {  
115 - stopped = true;  
116 - if (consumerExecutor != null) {  
117 - consumerExecutor.shutdownNow();  
118 - }  
119 - 126 + protected void doUnsubscribe() {
120 if (subscriber != null) { 127 if (subscriber != null) {
121 subscriber.close(); 128 subscriber.close();
122 } 129 }
123 - }  
124 -  
125 - @Override  
126 - public List<T> poll(long durationInMillis) {  
127 - if (!subscribed && partitions == null) {  
128 - try {  
129 - Thread.sleep(durationInMillis);  
130 - } catch (InterruptedException e) {  
131 - log.debug("Failed to await subscription", e);  
132 - }  
133 - } else {  
134 - if (!subscribed) {  
135 - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());  
136 - subscriptionNames.forEach(admin::createTopicIfNotExists);  
137 - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());  
138 - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();  
139 - subscribed = true;  
140 - }  
141 - List<ReceivedMessage> messages;  
142 - try {  
143 - messages = receiveMessages();  
144 - if (!messages.isEmpty()) {  
145 - List<T> result = new ArrayList<>();  
146 - messages.forEach(msg -> {  
147 - try {  
148 - result.add(decode(msg.getMessage()));  
149 - } catch (InvalidProtocolBufferException e) {  
150 - log.error("Failed decode record: [{}]", msg);  
151 - }  
152 - });  
153 - return result;  
154 - }  
155 - } catch (ExecutionException | InterruptedException e) {  
156 - if (stopped) {  
157 - log.info("[{}] Pub/Sub consumer is stopped.", topic);  
158 - } else {  
159 - log.error("Failed to receive messages", e);  
160 - }  
161 - }  
162 - }  
163 - return Collections.emptyList();  
164 - }  
165 -  
166 - @Override  
167 - public void commit() {  
168 - acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);  
169 - acknowledgeRequests.clear(); 130 + shutdownExecutor();
170 } 131 }
171 132
172 private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException { 133 private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
@@ -211,6 +172,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -211,6 +172,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
211 return transform.get(); 172 return transform.get();
212 } 173 }
213 174
  175 + @Override
214 public T decode(PubsubMessage message) throws InvalidProtocolBufferException { 176 public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
215 DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); 177 DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
216 return decoder.decode(msg); 178 return decoder.decode(msg);
@@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse; @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse;
23 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
25 import org.thingsboard.server.queue.TbQueueAdmin; 25 import org.thingsboard.server.queue.TbQueueAdmin;
26 -import org.thingsboard.server.queue.TbQueueConsumer;  
27 import org.thingsboard.server.queue.TbQueueMsg; 26 import org.thingsboard.server.queue.TbQueueMsg;
28 import org.thingsboard.server.queue.TbQueueMsgDecoder; 27 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  28 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 29 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
30 30
31 import java.io.IOException; 31 import java.io.IOException;
@@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException; @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException;
37 import java.util.stream.Collectors; 37 import java.util.stream.Collectors;
38 38
39 @Slf4j 39 @Slf4j
40 -public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 40 +public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
41 41
42 private final Gson gson = new Gson(); 42 private final Gson gson = new Gson();
43 private final TbQueueAdmin admin; 43 private final TbQueueAdmin admin;
44 - private final String topic;  
45 private final TbQueueMsgDecoder<T> decoder; 44 private final TbQueueMsgDecoder<T> decoder;
46 - private final TbRabbitMqSettings rabbitMqSettings;  
47 private final Channel channel; 45 private final Channel channel;
48 private final Connection connection; 46 private final Connection connection;
49 47
50 - private volatile Set<TopicPartitionInfo> partitions;  
51 - private volatile boolean subscribed;  
52 private volatile Set<String> queues; 48 private volatile Set<String> queues;
53 - private volatile boolean stopped;  
54 49
55 public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) { 50 public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  51 + super(topic);
56 this.admin = admin; 52 this.admin = admin;
57 this.decoder = decoder; 53 this.decoder = decoder;
58 - this.topic = topic;  
59 - this.rabbitMqSettings = rabbitMqSettings;  
60 try { 54 try {
61 connection = rabbitMqSettings.getConnectionFactory().newConnection(); 55 connection = rabbitMqSettings.getConnectionFactory().newConnection();
62 } catch (IOException | TimeoutException e) { 56 } catch (IOException | TimeoutException e) {
63 log.error("Failed to create connection.", e); 57 log.error("Failed to create connection.", e);
64 throw new RuntimeException("Failed to create connection.", e); 58 throw new RuntimeException("Failed to create connection.", e);
65 } 59 }
66 -  
67 try { 60 try {
68 channel = connection.createChannel(); 61 channel = connection.createChannel();
69 } catch (IOException e) { 62 } catch (IOException e) {
@@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
74 } 67 }
75 68
76 @Override 69 @Override
77 - public String getTopic() {  
78 - return topic; 70 + protected List<GetResponse> doPoll(long durationInMillis) {
  71 + List<GetResponse> result = queues.stream()
  72 + .map(queue -> {
  73 + try {
  74 + return channel.basicGet(queue, false);
  75 + } catch (IOException e) {
  76 + log.error("Failed to get messages from queue: [{}]", queue);
  77 + throw new RuntimeException("Failed to get messages from queue.", e);
  78 + }
  79 + }).filter(Objects::nonNull).collect(Collectors.toList());
  80 + if (result.size() > 0) {
  81 + return result;
  82 + } else {
  83 + return Collections.emptyList();
  84 + }
79 } 85 }
80 86
81 @Override 87 @Override
82 - public void subscribe() {  
83 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
84 - subscribed = false; 88 + protected void doSubscribe(List<String> topicNames) {
  89 + queues = partitions.stream()
  90 + .map(TopicPartitionInfo::getFullTopicName)
  91 + .collect(Collectors.toSet());
  92 + queues.forEach(admin::createTopicIfNotExists);
85 } 93 }
86 94
87 @Override 95 @Override
88 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
89 - this.partitions = partitions;  
90 - subscribed = false; 96 + protected void doCommit() {
  97 + try {
  98 + channel.basicAck(0, true);
  99 + } catch (IOException e) {
  100 + log.error("Failed to ack messages.", e);
  101 + }
91 } 102 }
92 103
93 @Override 104 @Override
94 - public void unsubscribe() {  
95 - stopped = true; 105 + protected void doUnsubscribe() {
96 if (channel != null) { 106 if (channel != null) {
97 try { 107 try {
98 channel.close(); 108 channel.close();
@@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
109 } 119 }
110 } 120 }
111 121
112 - @Override  
113 - public List<T> poll(long durationInMillis) {  
114 - if (!subscribed && partitions == null) {  
115 - try {  
116 - Thread.sleep(durationInMillis);  
117 - } catch (InterruptedException e) {  
118 - log.debug("Failed to await subscription", e);  
119 - }  
120 - } else {  
121 - if (!subscribed) {  
122 - queues = partitions.stream()  
123 - .map(TopicPartitionInfo::getFullTopicName)  
124 - .collect(Collectors.toSet());  
125 -  
126 - queues.forEach(admin::createTopicIfNotExists);  
127 - subscribed = true;  
128 - }  
129 -  
130 - List<T> result = queues.stream()  
131 - .map(queue -> {  
132 - try {  
133 - return channel.basicGet(queue, false);  
134 - } catch (IOException e) {  
135 - log.error("Failed to get messages from queue: [{}]", queue);  
136 - throw new RuntimeException("Failed to get messages from queue.", e);  
137 - }  
138 - }).filter(Objects::nonNull).map(message -> {  
139 - try {  
140 - return decode(message);  
141 - } catch (InvalidProtocolBufferException e) {  
142 - log.error("Failed to decode message: [{}].", message);  
143 - throw new RuntimeException("Failed to decode message.", e);  
144 - }  
145 - }).collect(Collectors.toList());  
146 - if (result.size() > 0) {  
147 - return result;  
148 - }  
149 - }  
150 - try {  
151 - Thread.sleep(durationInMillis);  
152 - } catch (InterruptedException e) {  
153 - if (!stopped) {  
154 - log.error("Failed to wait.", e);  
155 - }  
156 - }  
157 - return Collections.emptyList();  
158 - }  
159 -  
160 - @Override  
161 - public void commit() {  
162 - try {  
163 - channel.basicAck(0, true);  
164 - } catch (IOException e) {  
165 - log.error("Failed to ack messages.", e);  
166 - }  
167 - }  
168 -  
169 public T decode(GetResponse message) throws InvalidProtocolBufferException { 122 public T decode(GetResponse message) throws InvalidProtocolBufferException {
170 DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); 123 DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
171 return decoder.decode(msg); 124 return decoder.decode(msg);
@@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message; @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message;
25 import com.amazonaws.services.sqs.model.ReceiveMessageRequest; 25 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
26 import com.google.common.util.concurrent.Futures; 26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture; 27 import com.google.common.util.concurrent.ListenableFuture;
28 -import com.google.common.util.concurrent.ListeningExecutorService;  
29 -import com.google.common.util.concurrent.MoreExecutors;  
30 import com.google.gson.Gson; 28 import com.google.gson.Gson;
31 import com.google.protobuf.InvalidProtocolBufferException; 29 import com.google.protobuf.InvalidProtocolBufferException;
32 import lombok.Data; 30 import lombok.Data;
33 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
34 import org.springframework.util.CollectionUtils; 32 import org.springframework.util.CollectionUtils;
35 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
36 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
37 -import org.thingsboard.server.queue.TbQueueConsumer;  
38 import org.thingsboard.server.queue.TbQueueMsg; 34 import org.thingsboard.server.queue.TbQueueMsg;
39 import org.thingsboard.server.queue.TbQueueMsgDecoder; 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
40 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
41 38
42 -import java.io.IOException;  
43 import java.util.ArrayList; 39 import java.util.ArrayList;
44 import java.util.Collections; 40 import java.util.Collections;
45 import java.util.List; 41 import java.util.List;
@@ -47,34 +43,28 @@ import java.util.Objects; @@ -47,34 +43,28 @@ import java.util.Objects;
47 import java.util.Set; 43 import java.util.Set;
48 import java.util.concurrent.CopyOnWriteArrayList; 44 import java.util.concurrent.CopyOnWriteArrayList;
49 import java.util.concurrent.ExecutionException; 45 import java.util.concurrent.ExecutionException;
50 -import java.util.concurrent.Executors;  
51 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.TimeUnit;
52 import java.util.stream.Collectors; 47 import java.util.stream.Collectors;
53 import java.util.stream.Stream; 48 import java.util.stream.Stream;
54 49
55 @Slf4j 50 @Slf4j
56 -public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 51 +public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
57 52
58 private static final int MAX_NUM_MSGS = 10; 53 private static final int MAX_NUM_MSGS = 10;
59 54
60 private final Gson gson = new Gson(); 55 private final Gson gson = new Gson();
61 private final TbQueueAdmin admin; 56 private final TbQueueAdmin admin;
62 private final AmazonSQS sqsClient; 57 private final AmazonSQS sqsClient;
63 - private final String topic;  
64 private final TbQueueMsgDecoder<T> decoder; 58 private final TbQueueMsgDecoder<T> decoder;
65 private final TbAwsSqsSettings sqsSettings; 59 private final TbAwsSqsSettings sqsSettings;
66 60
67 private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>(); 61 private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>();
68 private volatile Set<String> queueUrls; 62 private volatile Set<String> queueUrls;
69 - private volatile Set<TopicPartitionInfo> partitions;  
70 - private ListeningExecutorService consumerExecutor;  
71 - private volatile boolean subscribed;  
72 - private volatile boolean stopped = false;  
73 63
74 public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) { 64 public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
75 this.admin = admin; 66 this.admin = admin;
76 this.decoder = decoder; 67 this.decoder = decoder;
77 - this.topic = topic;  
78 this.sqsSettings = sqsSettings; 68 this.sqsSettings = sqsSettings;
79 69
80 AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); 70 AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
@@ -87,81 +77,64 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -87,81 +77,64 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
87 } 77 }
88 78
89 @Override 79 @Override
90 - public String getTopic() {  
91 - return topic; 80 + protected void doSubscribe(List<String> topicNames) {
  81 + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
  82 + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1);
92 } 83 }
93 84
94 @Override 85 @Override
95 - public void subscribe() {  
96 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
97 - subscribed = false; 86 + protected List<Message> doPoll(long durationInMillis) {
  87 + if (!pendingMessages.isEmpty()) {
  88 + log.warn("Present {} non committed messages.", pendingMessages.size());
  89 + return Collections.emptyList();
  90 + }
  91 + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
  92 + List<ListenableFuture<List<Message>>> futureList = queueUrls
  93 + .stream()
  94 + .map(url -> poll(url, duration))
  95 + .collect(Collectors.toList());
  96 + ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
  97 + try {
  98 + return futureResult.get().stream()
  99 + .flatMap(List::stream)
  100 + .filter(Objects::nonNull)
  101 + .collect(Collectors.toList());
  102 + } catch (InterruptedException | ExecutionException e) {
  103 + if (stopped) {
  104 + log.info("[{}] Aws SQS consumer is stopped.", getTopic());
  105 + } else {
  106 + log.error("Failed to pool messages.", e);
  107 + }
  108 + return Collections.emptyList();
  109 + }
98 } 110 }
99 111
100 @Override 112 @Override
101 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
102 - this.partitions = partitions;  
103 - subscribed = false; 113 + public T decode(Message message) throws InvalidProtocolBufferException {
  114 + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
  115 + return decoder.decode(msg);
104 } 116 }
105 117
106 @Override 118 @Override
107 - public void unsubscribe() {  
108 - stopped = true;  
109 -  
110 - if (sqsClient != null) {  
111 - sqsClient.shutdown();  
112 - }  
113 - if (consumerExecutor != null) {  
114 - consumerExecutor.shutdownNow();  
115 - } 119 + protected void doCommit() {
  120 + pendingMessages.forEach(msg ->
  121 + consumerExecutor.submit(() -> {
  122 + List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
  123 + .stream()
  124 + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
  125 + .collect(Collectors.toList());
  126 + sqsClient.deleteMessageBatch(msg.getUrl(), entries);
  127 + }));
  128 + pendingMessages.clear();
116 } 129 }
117 130
118 @Override 131 @Override
119 - public List<T> poll(long durationInMillis) {  
120 - if (!subscribed && partitions == null) {  
121 - try {  
122 - Thread.sleep(durationInMillis);  
123 - } catch (InterruptedException e) {  
124 - log.debug("Failed to await subscription", e);  
125 - }  
126 - } else {  
127 - if (!subscribed) {  
128 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());  
129 - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());  
130 - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1));  
131 - subscribed = true;  
132 - }  
133 -  
134 - if (!pendingMessages.isEmpty()) {  
135 - log.warn("Present {} non committed messages.", pendingMessages.size());  
136 - return Collections.emptyList();  
137 - }  
138 -  
139 - List<ListenableFuture<List<Message>>> futureList = queueUrls  
140 - .stream()  
141 - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis)))  
142 - .collect(Collectors.toList());  
143 - ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);  
144 - try {  
145 - return futureResult.get().stream()  
146 - .flatMap(List::stream)  
147 - .map(msg -> {  
148 - try {  
149 - return decode(msg);  
150 - } catch (IOException e) {  
151 - log.error("Failed to decode message: [{}]", msg);  
152 - return null;  
153 - }  
154 - }).filter(Objects::nonNull)  
155 - .collect(Collectors.toList());  
156 - } catch (InterruptedException | ExecutionException e) {  
157 - if (stopped) {  
158 - log.info("[{}] Aws SQS consumer is stopped.", topic);  
159 - } else {  
160 - log.error("Failed to pool messages.", e);  
161 - }  
162 - } 132 + protected void doUnsubscribe() {
  133 + stopped = true;
  134 + if (sqsClient != null) {
  135 + sqsClient.shutdown();
163 } 136 }
164 - return Collections.emptyList(); 137 + shutdownExecutor();
165 } 138 }
166 139
167 private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) { 140 private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
@@ -194,25 +167,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -194,25 +167,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
194 }, consumerExecutor); 167 }, consumerExecutor);
195 } 168 }
196 169
197 - @Override  
198 - public void commit() {  
199 - pendingMessages.forEach(msg ->  
200 - consumerExecutor.submit(() -> {  
201 - List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()  
202 - .stream()  
203 - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))  
204 - .collect(Collectors.toList());  
205 - sqsClient.deleteMessageBatch(msg.getUrl(), entries);  
206 - }));  
207 -  
208 - pendingMessages.clear();  
209 - }  
210 -  
211 - public T decode(Message message) throws InvalidProtocolBufferException {  
212 - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);  
213 - return decoder.decode(msg);  
214 - }  
215 -  
216 @Data 170 @Data
217 private static class AwsSqsMsgWrapper { 171 private static class AwsSqsMsgWrapper {
218 private final String url; 172 private final String url;