Commit 00e32ee8a389e582d073cda3beeebb2b1e871378

Authored by YevhenBondarenko
2 parents 06c3caf0 8d5c38b7

merge with feature/queue-consumers-refactoring

Showing 23 changed files with 438 additions and 459 deletions
... ... @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
23 23 import org.thingsboard.server.actors.ActorSystemContext;
24 24 import org.thingsboard.server.common.msg.queue.ServiceType;
25 25 import org.thingsboard.server.common.msg.queue.TbCallback;
26   -import org.thingsboard.server.gen.transport.TransportProtos;
27 26 import org.thingsboard.server.queue.TbQueueConsumer;
28 27 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 28 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
... ...
... ... @@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
23 23 import java.util.Map;
24 24 import java.util.UUID;
25 25 import java.util.concurrent.ConcurrentMap;
26   -import java.util.concurrent.ExecutorService;
27 26 import java.util.concurrent.atomic.AtomicInteger;
28 27 import java.util.function.BiConsumer;
29 28
... ... @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
77 76 }
78 77 }
79 78 int submitSize = pendingPack.size();
80   - if (log.isInfoEnabled() && submitSize > 0) {
81   - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
  79 + if (log.isDebugEnabled() && submitSize > 0) {
  80 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
82 81 }
83 82 pendingPack.forEach(msgConsumer);
84 83 }
... ...
... ... @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22   -import java.util.ArrayList;
23   -import java.util.List;
24 22 import java.util.UUID;
25   -import java.util.concurrent.ConcurrentMap;
26   -import java.util.concurrent.ExecutorService;
27 23 import java.util.function.BiConsumer;
28   -import java.util.function.Function;
29   -import java.util.stream.Collectors;
30 24
31 25 @Slf4j
32 26 public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
... ... @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
37 31
38 32 @Override
39 33 public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
40   - if (log.isInfoEnabled()) {
41   - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
  34 + if (log.isDebugEnabled()) {
  35 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
42 36 }
43 37 orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg));
44 38 }
... ...
... ... @@ -15,26 +15,18 @@
15 15 */
16 16 package org.thingsboard.server.service.queue.processing;
17 17
18   -import com.google.protobuf.InvalidProtocolBufferException;
19 18 import lombok.extern.slf4j.Slf4j;
20 19 import org.thingsboard.server.common.data.id.EntityId;
21   -import org.thingsboard.server.common.data.id.EntityIdFactory;
22   -import org.thingsboard.server.common.msg.TbMsg;
23   -import org.thingsboard.server.common.msg.gen.MsgProtos;
24   -import org.thingsboard.server.common.msg.queue.TbMsgCallback;
25 20 import org.thingsboard.server.gen.transport.TransportProtos;
26 21 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
27 22
28   -import java.util.ArrayList;
29 23 import java.util.LinkedList;
30 24 import java.util.List;
31 25 import java.util.Queue;
32 26 import java.util.UUID;
33 27 import java.util.concurrent.ConcurrentHashMap;
34 28 import java.util.concurrent.ConcurrentMap;
35   -import java.util.concurrent.atomic.AtomicInteger;
36 29 import java.util.function.BiConsumer;
37   -import java.util.stream.Collectors;
38 30
39 31 @Slf4j
40 32 public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
... ...
... ... @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy
30 30 @Override
31 31 protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
32 32 return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
33   -
34 33 }
35 34 }
... ...
... ... @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22   -import java.util.LinkedHashMap;
23   -import java.util.Map;
24 22 import java.util.UUID;
25 23 import java.util.concurrent.ConcurrentMap;
26 24 import java.util.concurrent.atomic.AtomicInteger;
... ... @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu
63 61 if (idx < listSize) {
64 62 IdMsgPair pair = orderedMsgList.get(idx);
65 63 expectedMsgId = pair.uuid;
66   - if (log.isInfoEnabled()) {
67   - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
  64 + if (log.isDebugEnabled()) {
  65 + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
68 66 }
69 67 msgConsumer.accept(pair.uuid, pair.msg);
70 68 }
... ...
... ... @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory {
82 82 retryCount++;
83 83 double failedCount = result.getFailedMap().size() + result.getPendingMap().size();
84 84 if (maxRetries > 0 && retryCount > maxRetries) {
85   - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
  85 + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
86 86 return new TbRuleEngineProcessingDecision(true, null);
87 87 } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) {
88   - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
  88 + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
89 89 return new TbRuleEngineProcessingDecision(true, null);
90 90 } else {
91 91 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
... ... @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory {
98 98 if (retrySuccessful) {
99 99 result.getSuccessMap().forEach(toReprocess::put);
100 100 }
101   - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
  101 + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
102 102 if (log.isTraceEnabled()) {
103 103 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
104 104 }
... ... @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory {
126 126 @Override
127 127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
128 128 if (!result.isSuccess()) {
129   - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
  129 + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
130 130 }
131 131 if (log.isTraceEnabled()) {
132 132 result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
... ...
... ... @@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
67 67
68 68 try {
69 69 QueueDescription queueDescription = new QueueDescription(topic);
  70 + queueDescription.setRequiresDuplicateDetection(false);
70 71 setQueueConfigs(queueDescription);
71 72
72 73 client.createQueue(queueDescription);
... ...
... ... @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
31 31 import org.springframework.util.CollectionUtils;
32 32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
33 33 import org.thingsboard.server.queue.TbQueueAdmin;
34   -import org.thingsboard.server.queue.TbQueueConsumer;
35 34 import org.thingsboard.server.queue.TbQueueMsg;
36 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
37 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
38 38
39 39 import java.time.Duration;
... ... @@ -50,102 +50,72 @@ import java.util.stream.Collectors;
50 50 import java.util.stream.Stream;
51 51
52 52 @Slf4j
53   -public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  53 +public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
54 54 private final TbQueueAdmin admin;
55   - private final String topic;
56 55 private final TbQueueMsgDecoder<T> decoder;
57 56 private final TbServiceBusSettings serviceBusSettings;
58 57
59 58 private final Gson gson = new Gson();
60 59
61 60 private Set<CoreMessageReceiver> receivers;
62   - private volatile Set<TopicPartitionInfo> partitions;
63   - private volatile boolean subscribed;
64   - private volatile boolean stopped = false;
65   - private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
  61 + private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
66 62 private volatile int messagesPerQueue;
67 63
68 64 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
69 66 this.admin = admin;
70 67 this.decoder = decoder;
71   - this.topic = topic;
72 68 this.serviceBusSettings = serviceBusSettings;
73 69 }
74 70
75 71 @Override
76   - public String getTopic() {
77   - return topic;
  72 + protected List<MessageWithDeliveryTag> doPoll(long durationInMillis) {
  73 + List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
  74 + receivers.stream()
  75 + .map(receiver -> receiver
  76 + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
  77 + .whenComplete((messages, err) -> {
  78 + if (!CollectionUtils.isEmpty(messages)) {
  79 + pendingMessages.put(receiver, messages);
  80 + } else if (err != null) {
  81 + log.error("Failed to receive messages.", err);
  82 + }
  83 + }))
  84 + .collect(Collectors.toList());
  85 + try {
  86 + return fromList(messageFutures)
  87 + .get()
  88 + .stream()
  89 + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
  90 + .collect(Collectors.toList());
  91 + } catch (InterruptedException | ExecutionException e) {
  92 + if (stopped) {
  93 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
  94 + } else {
  95 + log.error("Failed to receive messages", e);
  96 + }
  97 + return Collections.emptyList();
  98 + }
78 99 }
79 100
80 101 @Override
81   - public void subscribe() {
82   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
83   - subscribed = false;
  102 + protected void doSubscribe(List<String> topicNames) {
  103 + createReceivers();
  104 + messagesPerQueue = receivers.size() / partitions.size();
84 105 }
85 106
86 107 @Override
87   - public void subscribe(Set<TopicPartitionInfo> partitions) {
88   - this.partitions = partitions;
89   - subscribed = false;
  108 + protected void doCommit() {
  109 + pendingMessages.forEach((receiver, msgs) ->
  110 + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
  111 + pendingMessages.clear();
90 112 }
91 113
92 114 @Override
93   - public void unsubscribe() {
94   - stopped = true;
  115 + protected void doUnsubscribe() {
95 116 receivers.forEach(CoreMessageReceiver::closeAsync);
96 117 }
97 118
98   - @Override
99   - public List<T> poll(long durationInMillis) {
100   - if (!subscribed && partitions == null) {
101   - try {
102   - Thread.sleep(durationInMillis);
103   - } catch (InterruptedException e) {
104   - log.debug("Failed to await subscription", e);
105   - }
106   - } else {
107   - if (!subscribed) {
108   - createReceivers();
109   - messagesPerQueue = receivers.size() / partitions.size();
110   - subscribed = true;
111   - }
112   -
113   - List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
114   - receivers.stream()
115   - .map(receiver -> receiver
116   - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
117   - .whenComplete((messages, err) -> {
118   - if (!CollectionUtils.isEmpty(messages)) {
119   - pendingMessages.put(receiver, messages);
120   - } else if (err != null) {
121   - log.error("Failed to receive messages.", err);
122   - }
123   - }))
124   - .collect(Collectors.toList());
125   - try {
126   - return fromList(messageFutures)
127   - .get()
128   - .stream()
129   - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
130   - .map(message -> {
131   - try {
132   - return decode(message);
133   - } catch (InvalidProtocolBufferException e) {
134   - log.error("Failed to parse message.", e);
135   - throw new RuntimeException("Failed to parse message.", e);
136   - }
137   - }).collect(Collectors.toList());
138   - } catch (InterruptedException | ExecutionException e) {
139   - if (stopped) {
140   - log.info("[{}] Service Bus consumer is stopped.", topic);
141   - } else {
142   - log.error("Failed to receive messages", e);
143   - }
144   - }
145   - }
146   - return Collections.emptyList();
147   - }
148   -
149 119 private void createReceivers() {
150 120 List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream()
151 121 .map(TopicPartitionInfo::getFullTopicName)
... ... @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
167 137 receivers = new HashSet<>(fromList(receiverFutures).get());
168 138 } catch (InterruptedException | ExecutionException e) {
169 139 if (stopped) {
170   - log.info("[{}] Service Bus consumer is stopped.", topic);
  140 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
171 141 } else {
172 142 log.error("Failed to create receivers", e);
173 143 }
... ... @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
196 166 }
197 167
198 168 @Override
199   - public void commit() {
200   - pendingMessages.forEach((receiver, msgs) ->
201   - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
202   - pendingMessages.clear();
203   - }
204   -
205   - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
  169 + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
206 170 DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
207 171 return decoder.decode(msg);
208 172 }
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
33 33 import java.util.HashMap;
34 34 import java.util.Map;
35 35 import java.util.concurrent.CompletableFuture;
  36 +import java.util.concurrent.ConcurrentHashMap;
36 37 import java.util.concurrent.ExecutorService;
37 38 import java.util.concurrent.Executors;
38 39
... ... @@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQue
42 43 private final Gson gson = new Gson();
43 44 private final TbQueueAdmin admin;
44 45 private final TbServiceBusSettings serviceBusSettings;
45   - private final Map<String, QueueClient> clients = new HashMap<>();
46   - private ExecutorService executorService;
  46 + private final Map<String, QueueClient> clients = new ConcurrentHashMap<>();
  47 + private final ExecutorService executorService;
47 48
48 49 public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
49 50 this.admin = admin;
50 51 this.defaultTopic = defaultTopic;
51 52 this.serviceBusSettings = serviceBusSettings;
52   - executorService = Executors.newSingleThreadExecutor();
  53 + executorService = Executors.newCachedThreadPool();
53 54 }
54 55
55 56 @Override
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import com.google.common.util.concurrent.ListeningExecutorService;
  19 +import com.google.common.util.concurrent.MoreExecutors;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.queue.TbQueueMsg;
  22 +
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +
  26 +@Slf4j
  27 +public abstract class AbstractParallelTbQueueConsumerTemplate<R, T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<R, T> {
  28 +
  29 + protected ListeningExecutorService consumerExecutor;
  30 +
  31 + public AbstractParallelTbQueueConsumerTemplate(String topic) {
  32 + super(topic);
  33 + }
  34 +
  35 + protected void initNewExecutor(int threadPoolSize) {
  36 + if (consumerExecutor != null) {
  37 + consumerExecutor.shutdown();
  38 + try {
  39 + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
  40 + } catch (InterruptedException e) {
  41 + log.trace("Interrupted while waiting for consumer executor to stop");
  42 + }
  43 + }
  44 + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
  45 + }
  46 +
  47 + protected void shutdownExecutor() {
  48 + if (consumerExecutor != null) {
  49 + consumerExecutor.shutdownNow();
  50 + }
  51 + }
  52 +
  53 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  21 +import org.thingsboard.server.queue.TbQueueConsumer;
  22 +import org.thingsboard.server.queue.TbQueueMsg;
  23 +
  24 +import java.io.IOException;
  25 +import java.util.ArrayList;
  26 +import java.util.Collections;
  27 +import java.util.List;
  28 +import java.util.Set;
  29 +import java.util.concurrent.locks.Lock;
  30 +import java.util.concurrent.locks.ReentrantLock;
  31 +import java.util.stream.Collectors;
  32 +
  33 +@Slf4j
  34 +public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
  35 +
  36 + private volatile boolean subscribed;
  37 + protected volatile boolean stopped = false;
  38 + protected volatile Set<TopicPartitionInfo> partitions;
  39 + protected final Lock consumerLock = new ReentrantLock();
  40 +
  41 + @Getter
  42 + private final String topic;
  43 +
  44 + public AbstractTbQueueConsumerTemplate(String topic) {
  45 + this.topic = topic;
  46 + }
  47 +
  48 + @Override
  49 + public void subscribe() {
  50 + consumerLock.lock();
  51 + try {
  52 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  53 + subscribed = false;
  54 + } finally {
  55 + consumerLock.unlock();
  56 + }
  57 + }
  58 +
  59 + @Override
  60 + public void subscribe(Set<TopicPartitionInfo> partitions) {
  61 + consumerLock.lock();
  62 + try {
  63 + this.partitions = partitions;
  64 + subscribed = false;
  65 + } finally {
  66 + consumerLock.unlock();
  67 + }
  68 + }
  69 +
  70 + @Override
  71 + public List<T> poll(long durationInMillis) {
  72 + if (!subscribed && partitions == null) {
  73 + try {
  74 + Thread.sleep(durationInMillis);
  75 + } catch (InterruptedException e) {
  76 + log.debug("Failed to await subscription", e);
  77 + }
  78 + } else {
  79 + long pollStartTs = System.currentTimeMillis();
  80 + consumerLock.lock();
  81 + try {
  82 + if (!subscribed) {
  83 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  84 + doSubscribe(topicNames);
  85 + subscribed = true;
  86 + }
  87 +
  88 + List<R> records = doPoll(durationInMillis);
  89 + if (!records.isEmpty()) {
  90 + List<T> result = new ArrayList<>(records.size());
  91 + records.forEach(record -> {
  92 + try {
  93 + if (record != null) {
  94 + result.add(decode(record));
  95 + }
  96 + } catch (IOException e) {
  97 + log.error("Failed decode record: [{}]", record);
  98 + throw new RuntimeException("Failed to decode record: ", e);
  99 + }
  100 + });
  101 + return result;
  102 + } else {
  103 + long pollDuration = System.currentTimeMillis() - pollStartTs;
  104 + if (pollDuration < durationInMillis) {
  105 + try {
  106 + Thread.sleep(durationInMillis - pollDuration);
  107 + } catch (InterruptedException e) {
  108 + if (!stopped) {
  109 + log.error("Failed to wait.", e);
  110 + }
  111 + }
  112 + }
  113 + }
  114 + } finally {
  115 + consumerLock.unlock();
  116 + }
  117 + }
  118 + return Collections.emptyList();
  119 + }
  120 +
  121 + @Override
  122 + public void commit() {
  123 + consumerLock.lock();
  124 + try {
  125 + doCommit();
  126 + } finally {
  127 + consumerLock.unlock();
  128 + }
  129 + }
  130 +
  131 + @Override
  132 + public void unsubscribe() {
  133 + stopped = true;
  134 + consumerLock.lock();
  135 + try {
  136 + doUnsubscribe();
  137 + } finally {
  138 + consumerLock.unlock();
  139 + }
  140 + }
  141 +
  142 + abstract protected List<R> doPoll(long durationInMillis);
  143 +
  144 + abstract protected T decode(R record) throws IOException;
  145 +
  146 + abstract protected void doSubscribe(List<String> topicNames);
  147 +
  148 + abstract protected void doCommit();
  149 +
  150 + abstract protected void doUnsubscribe();
  151 +
  152 +}
... ...
... ... @@ -16,16 +16,14 @@
16 16 package org.thingsboard.server.queue.kafka;
17 17
18 18 import lombok.Builder;
19   -import lombok.Getter;
20 19 import lombok.extern.slf4j.Slf4j;
21 20 import org.apache.kafka.clients.consumer.ConsumerConfig;
22 21 import org.apache.kafka.clients.consumer.ConsumerRecord;
23 22 import org.apache.kafka.clients.consumer.ConsumerRecords;
24 23 import org.apache.kafka.clients.consumer.KafkaConsumer;
25   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
26 24 import org.thingsboard.server.queue.TbQueueAdmin;
27   -import org.thingsboard.server.queue.TbQueueConsumer;
28 25 import org.thingsboard.server.queue.TbQueueMsg;
  26 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 27
30 28 import java.io.IOException;
31 29 import java.time.Duration;
... ... @@ -33,26 +31,16 @@ import java.util.ArrayList;
33 31 import java.util.Collections;
34 32 import java.util.List;
35 33 import java.util.Properties;
36   -import java.util.Set;
37   -import java.util.concurrent.locks.Lock;
38   -import java.util.concurrent.locks.ReentrantLock;
39   -import java.util.stream.Collectors;
40 34
41 35 /**
42 36 * Created by ashvayka on 24.09.18.
43 37 */
44 38 @Slf4j
45   -public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  39 +public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
46 40
47 41 private final TbQueueAdmin admin;
48 42 private final KafkaConsumer<String, byte[]> consumer;
49 43 private final TbKafkaDecoder<T> decoder;
50   - private volatile boolean subscribed;
51   - private volatile Set<TopicPartitionInfo> partitions;
52   - private final Lock consumerLock;
53   -
54   - @Getter
55   - private final String topic;
56 44
57 45 @Builder
58 46 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
... ... @@ -60,6 +48,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
60 48 boolean autoCommit, int autoCommitIntervalMs,
61 49 int maxPollRecords,
62 50 TbQueueAdmin admin) {
  51 + super(topic);
63 52 Properties props = settings.toProps();
64 53 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
65 54 if (groupId != null) {
... ... @@ -75,86 +64,42 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
75 64 this.admin = admin;
76 65 this.consumer = new KafkaConsumer<>(props);
77 66 this.decoder = decoder;
78   - this.topic = topic;
79   - this.consumerLock = new ReentrantLock();
80   - }
81   -
82   - @Override
83   - public void subscribe() {
84   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
85   - subscribed = false;
86 67 }
87 68
88 69 @Override
89   - public void subscribe(Set<TopicPartitionInfo> partitions) {
90   - this.partitions = partitions;
91   - subscribed = false;
  70 + protected void doSubscribe(List<String> topicNames) {
  71 + topicNames.forEach(admin::createTopicIfNotExists);
  72 + consumer.subscribe(topicNames);
92 73 }
93 74
94 75 @Override
95   - public List<T> poll(long durationInMillis) {
96   - if (!subscribed && partitions == null) {
97   - try {
98   - Thread.sleep(durationInMillis);
99   - } catch (InterruptedException e) {
100   - log.debug("Failed to await subscription", e);
101   - }
  76 + protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
  77 + ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
  78 + if (records.isEmpty()) {
  79 + return Collections.emptyList();
102 80 } else {
103   - try {
104   - consumerLock.lock();
105   -
106   - if (!subscribed) {
107   - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
108   - topicNames.forEach(admin::createTopicIfNotExists);
109   - consumer.unsubscribe();
110   - consumer.subscribe(topicNames);
111   - subscribed = true;
112   - }
113   -
114   - ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
115   - if (records.count() > 0) {
116   - List<T> result = new ArrayList<>();
117   - records.forEach(record -> {
118   - try {
119   - result.add(decode(record));
120   - } catch (IOException e) {
121   - log.error("Failed decode record: [{}]", record);
122   - }
123   - });
124   - return result;
125   - }
126   - } finally {
127   - consumerLock.unlock();
128   - }
  81 + List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
  82 + records.forEach(recordList::add);
  83 + return recordList;
129 84 }
130   - return Collections.emptyList();
131 85 }
132 86
133 87 @Override
134   - public void commit() {
135   - try {
136   - consumerLock.lock();
137   - consumer.commitAsync();
138   - } finally {
139   - consumerLock.unlock();
140   - }
  88 + public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
  89 + return decoder.decode(new KafkaTbQueueMsg(record));
141 90 }
142 91
143 92 @Override
144   - public void unsubscribe() {
145   - try {
146   - consumerLock.lock();
147   - if (consumer != null) {
148   - consumer.unsubscribe();
149   - consumer.close();
150   - }
151   - } finally {
152   - consumerLock.unlock();
153   - }
  93 + protected void doCommit() {
  94 + consumer.commitAsync();
154 95 }
155 96
156   - public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
157   - return decoder.decode(new KafkaTbQueueMsg(record));
  97 + @Override
  98 + protected void doUnsubscribe() {
  99 + if (consumer != null) {
  100 + consumer.unsubscribe();
  101 + consumer.close();
  102 + }
158 103 }
159 104
160 105 }
... ...
... ... @@ -30,27 +30,25 @@ import com.google.pubsub.v1.PullResponse;
30 30 import com.google.pubsub.v1.ReceivedMessage;
31 31 import lombok.extern.slf4j.Slf4j;
32 32 import org.springframework.util.CollectionUtils;
33   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
34 33 import org.thingsboard.server.queue.TbQueueAdmin;
35   -import org.thingsboard.server.queue.TbQueueConsumer;
36 34 import org.thingsboard.server.queue.TbQueueMsg;
37 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
38 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
39 38
40 39 import java.io.IOException;
41 40 import java.util.ArrayList;
42 41 import java.util.Collections;
  42 +import java.util.LinkedHashSet;
43 43 import java.util.List;
44 44 import java.util.Objects;
45 45 import java.util.Set;
46 46 import java.util.concurrent.CopyOnWriteArrayList;
47 47 import java.util.concurrent.ExecutionException;
48   -import java.util.concurrent.ExecutorService;
49   -import java.util.concurrent.Executors;
50 48 import java.util.stream.Collectors;
51 49
52 50 @Slf4j
53   -public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  51 +public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
54 52
55 53 private final Gson gson = new Gson();
56 54 private final TbQueueAdmin admin;
... ... @@ -58,23 +56,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
58 56 private final TbQueueMsgDecoder<T> decoder;
59 57 private final TbPubSubSettings pubSubSettings;
60 58
61   - private volatile boolean subscribed;
62   - private volatile Set<TopicPartitionInfo> partitions;
63 59 private volatile Set<String> subscriptionNames;
64 60 private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>();
65 61
66   - private ExecutorService consumerExecutor;
67 62 private final SubscriberStub subscriber;
68   - private volatile boolean stopped;
69   -
70 63 private volatile int messagesPerTopic;
71 64
72 65 public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  66 + super(topic);
73 67 this.admin = admin;
74 68 this.pubSubSettings = pubSubSettings;
75 69 this.topic = topic;
76 70 this.decoder = decoder;
77   -
78 71 try {
79 72 SubscriberStubSettings subscriberStubSettings =
80 73 SubscriberStubSettings.newBuilder()
... ... @@ -84,94 +77,50 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
84 77 .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize())
85 78 .build())
86 79 .build();
87   -
88 80 this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
89 81 } catch (IOException e) {
90 82 log.error("Failed to create subscriber.", e);
91 83 throw new RuntimeException("Failed to create subscriber.", e);
92 84 }
93   - stopped = false;
94 85 }
95 86
96 87 @Override
97   - public String getTopic() {
98   - return topic;
  88 + protected List<PubsubMessage> doPoll(long durationInMillis) {
  89 + try {
  90 + List<ReceivedMessage> messages = receiveMessages();
  91 + if (!messages.isEmpty()) {
  92 + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
  93 + }
  94 + } catch (ExecutionException | InterruptedException e) {
  95 + if (stopped) {
  96 + log.info("[{}] Pub/Sub consumer is stopped.", topic);
  97 + } else {
  98 + log.error("Failed to receive messages", e);
  99 + }
  100 + }
  101 + return Collections.emptyList();
99 102 }
100 103
101 104 @Override
102   - public void subscribe() {
103   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
104   - subscribed = false;
  105 + protected void doSubscribe(List<String> topicNames) {
  106 + subscriptionNames = new LinkedHashSet<>(topicNames);
  107 + subscriptionNames.forEach(admin::createTopicIfNotExists);
  108 + initNewExecutor(subscriptionNames.size() + 1);
  109 + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
105 110 }
106 111
107 112 @Override
108   - public void subscribe(Set<TopicPartitionInfo> partitions) {
109   - this.partitions = partitions;
110   - subscribed = false;
  113 + protected void doCommit() {
  114 + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);
  115 + acknowledgeRequests.clear();
111 116 }
112 117
113 118 @Override
114   - public void unsubscribe() {
115   - stopped = true;
116   - if (consumerExecutor != null) {
117   - consumerExecutor.shutdownNow();
118   - }
119   -
  119 + protected void doUnsubscribe() {
120 120 if (subscriber != null) {
121 121 subscriber.close();
122 122 }
123   - }
124   -
125   - @Override
126   - public List<T> poll(long durationInMillis) {
127   - if (!subscribed && partitions == null) {
128   - try {
129   - Thread.sleep(durationInMillis);
130   - } catch (InterruptedException e) {
131   - log.debug("Failed to await subscription", e);
132   - }
133   - } else {
134   - if (!subscribed) {
135   - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());
136   - subscriptionNames.forEach(admin::createTopicIfNotExists);
137   -
138   - if (consumerExecutor != null) {
139   - consumerExecutor.shutdown();
140   - }
141   -
142   - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());
143   - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
144   - subscribed = true;
145   - }
146   - List<ReceivedMessage> messages;
147   - try {
148   - messages = receiveMessages();
149   - if (!messages.isEmpty()) {
150   - List<T> result = new ArrayList<>();
151   - messages.forEach(msg -> {
152   - try {
153   - result.add(decode(msg.getMessage()));
154   - } catch (InvalidProtocolBufferException e) {
155   - log.error("Failed decode record: [{}]", msg);
156   - }
157   - });
158   - return result;
159   - }
160   - } catch (ExecutionException | InterruptedException e) {
161   - if (stopped) {
162   - log.info("[{}] Pub/Sub consumer is stopped.", topic);
163   - } else {
164   - log.error("Failed to receive messages", e);
165   - }
166   - }
167   - }
168   - return Collections.emptyList();
169   - }
170   -
171   - @Override
172   - public void commit() {
173   - acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);
174   - acknowledgeRequests.clear();
  123 + shutdownExecutor();
175 124 }
176 125
177 126 private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
... ... @@ -180,7 +129,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
180 129 PullRequest pullRequest =
181 130 PullRequest.newBuilder()
182 131 .setMaxMessages(messagesPerTopic)
183   - .setReturnImmediately(false) // return immediately if messages are not available
  132 +// .setReturnImmediately(false) // return immediately if messages are not available
184 133 .setSubscription(subscriptionName)
185 134 .build();
186 135
... ... @@ -216,6 +165,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
216 165 return transform.get();
217 166 }
218 167
  168 + @Override
219 169 public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
220 170 DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
221 171 return decoder.decode(msg);
... ...
... ... @@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
49 49
50 50 private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
51 51
52   - private ExecutorService pubExecutor = Executors.newCachedThreadPool();
  52 + private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
53 53
54 54 public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
55 55 this.defaultTopic = defaultTopic;
... ...
... ... @@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException;
27 27 @Slf4j
28 28 public class TbRabbitMqAdmin implements TbQueueAdmin {
29 29
30   - private final TbRabbitMqSettings rabbitMqSettings;
31 30 private final Channel channel;
32 31 private final Connection connection;
33 32 private final Map<String, Object> arguments;
34 33
35 34 public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) {
36   - this.rabbitMqSettings = rabbitMqSettings;
37 35 this.arguments = arguments;
38 36
39 37 try {
... ...
... ... @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse;
23 23 import lombok.extern.slf4j.Slf4j;
24 24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
25 25 import org.thingsboard.server.queue.TbQueueAdmin;
26   -import org.thingsboard.server.queue.TbQueueConsumer;
27 26 import org.thingsboard.server.queue.TbQueueMsg;
28 27 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  28 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 29 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
30 30
31 31 import java.io.IOException;
... ... @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException;
37 37 import java.util.stream.Collectors;
38 38
39 39 @Slf4j
40   -public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  40 +public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
41 41
42 42 private final Gson gson = new Gson();
43 43 private final TbQueueAdmin admin;
44   - private final String topic;
45 44 private final TbQueueMsgDecoder<T> decoder;
46   - private final TbRabbitMqSettings rabbitMqSettings;
47 45 private final Channel channel;
48 46 private final Connection connection;
49 47
50   - private volatile Set<TopicPartitionInfo> partitions;
51   - private volatile boolean subscribed;
52 48 private volatile Set<String> queues;
53   - private volatile boolean stopped;
54 49
55 50 public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  51 + super(topic);
56 52 this.admin = admin;
57 53 this.decoder = decoder;
58   - this.topic = topic;
59   - this.rabbitMqSettings = rabbitMqSettings;
60 54 try {
61 55 connection = rabbitMqSettings.getConnectionFactory().newConnection();
62 56 } catch (IOException | TimeoutException e) {
63 57 log.error("Failed to create connection.", e);
64 58 throw new RuntimeException("Failed to create connection.", e);
65 59 }
66   -
67 60 try {
68 61 channel = connection.createChannel();
69 62 } catch (IOException e) {
... ... @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
74 67 }
75 68
76 69 @Override
77   - public String getTopic() {
78   - return topic;
  70 + protected List<GetResponse> doPoll(long durationInMillis) {
  71 + List<GetResponse> result = queues.stream()
  72 + .map(queue -> {
  73 + try {
  74 + return channel.basicGet(queue, false);
  75 + } catch (IOException e) {
  76 + log.error("Failed to get messages from queue: [{}]", queue);
  77 + throw new RuntimeException("Failed to get messages from queue.", e);
  78 + }
  79 + }).filter(Objects::nonNull).collect(Collectors.toList());
  80 + if (result.size() > 0) {
  81 + return result;
  82 + } else {
  83 + return Collections.emptyList();
  84 + }
79 85 }
80 86
81 87 @Override
82   - public void subscribe() {
83   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
84   - subscribed = false;
  88 + protected void doSubscribe(List<String> topicNames) {
  89 + queues = partitions.stream()
  90 + .map(TopicPartitionInfo::getFullTopicName)
  91 + .collect(Collectors.toSet());
  92 + queues.forEach(admin::createTopicIfNotExists);
85 93 }
86 94
87 95 @Override
88   - public void subscribe(Set<TopicPartitionInfo> partitions) {
89   - this.partitions = partitions;
90   - subscribed = false;
  96 + protected void doCommit() {
  97 + try {
  98 + channel.basicAck(0, true);
  99 + } catch (IOException e) {
  100 + log.error("Failed to ack messages.", e);
  101 + }
91 102 }
92 103
93 104 @Override
94   - public void unsubscribe() {
95   - stopped = true;
  105 + protected void doUnsubscribe() {
96 106 if (channel != null) {
97 107 try {
98 108 channel.close();
... ... @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
109 119 }
110 120 }
111 121
112   - @Override
113   - public List<T> poll(long durationInMillis) {
114   - if (!subscribed && partitions == null) {
115   - try {
116   - Thread.sleep(durationInMillis);
117   - } catch (InterruptedException e) {
118   - log.debug("Failed to await subscription", e);
119   - }
120   - } else {
121   - if (!subscribed) {
122   - queues = partitions.stream()
123   - .map(TopicPartitionInfo::getFullTopicName)
124   - .collect(Collectors.toSet());
125   -
126   - queues.forEach(admin::createTopicIfNotExists);
127   - subscribed = true;
128   - }
129   -
130   - List<T> result = queues.stream()
131   - .map(queue -> {
132   - try {
133   - return channel.basicGet(queue, false);
134   - } catch (IOException e) {
135   - log.error("Failed to get messages from queue: [{}]", queue);
136   - throw new RuntimeException("Failed to get messages from queue.", e);
137   - }
138   - }).filter(Objects::nonNull).map(message -> {
139   - try {
140   - return decode(message);
141   - } catch (InvalidProtocolBufferException e) {
142   - log.error("Failed to decode message: [{}].", message);
143   - throw new RuntimeException("Failed to decode message.", e);
144   - }
145   - }).collect(Collectors.toList());
146   - if (result.size() > 0) {
147   - return result;
148   - }
149   - }
150   - try {
151   - Thread.sleep(durationInMillis);
152   - } catch (InterruptedException e) {
153   - if (!stopped) {
154   - log.error("Failed to wait.", e);
155   - }
156   - }
157   - return Collections.emptyList();
158   - }
159   -
160   - @Override
161   - public void commit() {
162   - try {
163   - channel.basicAck(0, true);
164   - } catch (IOException e) {
165   - log.error("Failed to ack messages.", e);
166   - }
167   - }
168   -
169 122 public T decode(GetResponse message) throws InvalidProtocolBufferException {
170 123 DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
171 124 return decoder.decode(msg);
... ...
... ... @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer;
30 30 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
31 31
32 32 import java.io.IOException;
  33 +import java.util.Set;
  34 +import java.util.concurrent.ConcurrentHashMap;
33 35 import java.util.concurrent.Executors;
34 36 import java.util.concurrent.TimeoutException;
35 37
... ... @@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
39 41 private final Gson gson = new Gson();
40 42 private final TbQueueAdmin admin;
41 43 private final TbRabbitMqSettings rabbitMqSettings;
42   - private ListeningExecutorService producerExecutor;
  44 + private final ListeningExecutorService producerExecutor;
43 45 private final Channel channel;
44 46 private final Connection connection;
45 47
  48 + private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
  49 +
46 50 public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
47 51 this.admin = admin;
48 52 this.defaultTopic = defaultTopic;
... ... @@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
75 79
76 80 @Override
77 81 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  82 + createTopicIfNotExist(tpi);
78 83 AMQP.BasicProperties properties = new AMQP.BasicProperties();
79 84 try {
80 85 channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
... ... @@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
110 115 }
111 116 }
112 117
  118 + private void createTopicIfNotExist(TopicPartitionInfo tpi) {
  119 + if (topics.contains(tpi)) {
  120 + return;
  121 + }
  122 + admin.createTopicIfNotExists(tpi.getFullTopicName());
  123 + topics.add(tpi);
  124 + }
113 125 }
... ...
... ... @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message;
25 25 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
26 26 import com.google.common.util.concurrent.Futures;
27 27 import com.google.common.util.concurrent.ListenableFuture;
28   -import com.google.common.util.concurrent.ListeningExecutorService;
29   -import com.google.common.util.concurrent.MoreExecutors;
30 28 import com.google.gson.Gson;
31 29 import com.google.protobuf.InvalidProtocolBufferException;
32 30 import lombok.Data;
33 31 import lombok.extern.slf4j.Slf4j;
34 32 import org.springframework.util.CollectionUtils;
35   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
36 33 import org.thingsboard.server.queue.TbQueueAdmin;
37   -import org.thingsboard.server.queue.TbQueueConsumer;
38 34 import org.thingsboard.server.queue.TbQueueMsg;
39 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
40 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
41 38
42   -import java.io.IOException;
43 39 import java.util.ArrayList;
44 40 import java.util.Collections;
45 41 import java.util.List;
... ... @@ -47,34 +43,28 @@ import java.util.Objects;
47 43 import java.util.Set;
48 44 import java.util.concurrent.CopyOnWriteArrayList;
49 45 import java.util.concurrent.ExecutionException;
50   -import java.util.concurrent.Executors;
51 46 import java.util.concurrent.TimeUnit;
52 47 import java.util.stream.Collectors;
53 48 import java.util.stream.Stream;
54 49
55 50 @Slf4j
56   -public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> {
  51 +public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
57 52
58 53 private static final int MAX_NUM_MSGS = 10;
59 54
60 55 private final Gson gson = new Gson();
61 56 private final TbQueueAdmin admin;
62 57 private final AmazonSQS sqsClient;
63   - private final String topic;
64 58 private final TbQueueMsgDecoder<T> decoder;
65 59 private final TbAwsSqsSettings sqsSettings;
66 60
67 61 private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>();
68 62 private volatile Set<String> queueUrls;
69   - private volatile Set<TopicPartitionInfo> partitions;
70   - private ListeningExecutorService consumerExecutor;
71   - private volatile boolean subscribed;
72   - private volatile boolean stopped = false;
73 63
74 64 public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
75 66 this.admin = admin;
76 67 this.decoder = decoder;
77   - this.topic = topic;
78 68 this.sqsSettings = sqsSettings;
79 69
80 70 AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
... ... @@ -87,86 +77,60 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
87 77 }
88 78
89 79 @Override
90   - public String getTopic() {
91   - return topic;
  80 + protected void doSubscribe(List<String> topicNames) {
  81 + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
  82 + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1);
92 83 }
93 84
94 85 @Override
95   - public void subscribe() {
96   - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
97   - subscribed = false;
  86 + protected List<Message> doPoll(long durationInMillis) {
  87 + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
  88 + List<ListenableFuture<List<Message>>> futureList = queueUrls
  89 + .stream()
  90 + .map(url -> poll(url, duration))
  91 + .collect(Collectors.toList());
  92 + ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
  93 + try {
  94 + return futureResult.get().stream()
  95 + .flatMap(List::stream)
  96 + .filter(Objects::nonNull)
  97 + .collect(Collectors.toList());
  98 + } catch (InterruptedException | ExecutionException e) {
  99 + if (stopped) {
  100 + log.info("[{}] Aws SQS consumer is stopped.", getTopic());
  101 + } else {
  102 + log.error("Failed to pool messages.", e);
  103 + }
  104 + return Collections.emptyList();
  105 + }
98 106 }
99 107
100 108 @Override
101   - public void subscribe(Set<TopicPartitionInfo> partitions) {
102   - this.partitions = partitions;
103   - subscribed = false;
  109 + public T decode(Message message) throws InvalidProtocolBufferException {
  110 + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
  111 + return decoder.decode(msg);
104 112 }
105 113
106 114 @Override
107   - public void unsubscribe() {
108   - stopped = true;
109   -
110   - if (sqsClient != null) {
111   - sqsClient.shutdown();
112   - }
113   - if (consumerExecutor != null) {
114   - consumerExecutor.shutdownNow();
115   - }
  115 + protected void doCommit() {
  116 + pendingMessages.forEach(msg ->
  117 + consumerExecutor.submit(() -> {
  118 + List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
  119 + .stream()
  120 + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
  121 + .collect(Collectors.toList());
  122 + sqsClient.deleteMessageBatch(msg.getUrl(), entries);
  123 + }));
  124 + pendingMessages.clear();
116 125 }
117 126
118 127 @Override
119   - public List<T> poll(long durationInMillis) {
120   - if (!subscribed && partitions == null) {
121   - try {
122   - Thread.sleep(durationInMillis);
123   - } catch (InterruptedException e) {
124   - log.debug("Failed to await subscription", e);
125   - }
126   - } else {
127   - if (!subscribed) {
128   - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
129   - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
130   -
131   - if (consumerExecutor != null) {
132   - consumerExecutor.shutdown();
133   - }
134   -
135   - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1));
136   - subscribed = true;
137   - }
138   -
139   - if (!pendingMessages.isEmpty()) {
140   - log.warn("Present {} non committed messages.", pendingMessages.size());
141   - return Collections.emptyList();
142   - }
143   -
144   - List<ListenableFuture<List<Message>>> futureList = queueUrls
145   - .stream()
146   - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis)))
147   - .collect(Collectors.toList());
148   - ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
149   - try {
150   - return futureResult.get().stream()
151   - .flatMap(List::stream)
152   - .map(msg -> {
153   - try {
154   - return decode(msg);
155   - } catch (IOException e) {
156   - log.error("Failed to decode message: [{}]", msg);
157   - return null;
158   - }
159   - }).filter(Objects::nonNull)
160   - .collect(Collectors.toList());
161   - } catch (InterruptedException | ExecutionException e) {
162   - if (stopped) {
163   - log.info("[{}] Aws SQS consumer is stopped.", topic);
164   - } else {
165   - log.error("Failed to pool messages.", e);
166   - }
167   - }
  128 + protected void doUnsubscribe() {
  129 + stopped = true;
  130 + if (sqsClient != null) {
  131 + sqsClient.shutdown();
168 132 }
169   - return Collections.emptyList();
  133 + shutdownExecutor();
170 134 }
171 135
172 136 private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
... ... @@ -198,25 +162,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
198 162 }, consumerExecutor);
199 163 }
200 164
201   - @Override
202   - public void commit() {
203   - pendingMessages.forEach(msg ->
204   - consumerExecutor.submit(() -> {
205   - List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
206   - .stream()
207   - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
208   - .collect(Collectors.toList());
209   - sqsClient.deleteMessageBatch(msg.getUrl(), entries);
210   - }));
211   -
212   - pendingMessages.clear();
213   - }
214   -
215   - public T decode(Message message) throws InvalidProtocolBufferException {
216   - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
217   - return decoder.decode(msg);
218   - }
219   -
220 165 @Data
221 166 private static class AwsSqsMsgWrapper {
222 167 private final String url;
... ...
... ... @@ -28,7 +28,7 @@ services:
28 28 ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181
29 29 kafka:
30 30 restart: always
31   - image: "wurstmeister/kafka:2.12-2.2.1"
  31 + image: "wurstmeister/kafka:2.12-2.3.0"
32 32 ports:
33 33 - "9092:9092"
34 34 env_file:
... ...
... ... @@ -28,7 +28,7 @@ const secretAccessKey = config.get('aws_sqs.secret_access_key');
28 28 const region = config.get('aws_sqs.region');
29 29 const AWS = require('aws-sdk');
30 30 const queueProperties = config.get('aws_sqs.queue_properties');
31   -const poolInterval = config.get('js.response_poll_interval');
  31 +const pollInterval = config.get('js.response_poll_interval');
32 32
33 33 let queueAttributes = {FifoQueue: 'true'};
34 34 let sqsClient;
... ... @@ -52,7 +52,12 @@ function AwsSqsProducer() {
52 52 queueUrls.set(responseTopic, responseQueueUrl);
53 53 }
54 54
55   - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: 'js_eval', MessageDeduplicationId: uuid()};
  55 + let params = {
  56 + MessageBody: msgBody,
  57 + QueueUrl: responseQueueUrl,
  58 + MessageGroupId: 'js_eval',
  59 + MessageDeduplicationId: uuid()
  60 + };
56 61
57 62 return new Promise((resolve, reject) => {
58 63 sqsClient.sendMessage(params, function (err, data) {
... ... @@ -98,6 +103,7 @@ function AwsSqsProducer() {
98 103 WaitTimeSeconds: poolInterval / 1000
99 104 };
100 105 while (!stopped) {
  106 + let pollStartTs = new Date().getTime();
101 107 const messages = await new Promise((resolve, reject) => {
102 108 sqsClient.receiveMessage(params, function (err, data) {
103 109 if (err) {
... ... @@ -130,6 +136,11 @@ function AwsSqsProducer() {
130 136 //do nothing
131 137 }
132 138 });
  139 + } else {
  140 + let pollDuration = new Date().getTime() - pollStartTs;
  141 + if (pollDuration < pollInterval) {
  142 + await sleep(pollInterval - pollDuration);
  143 + }
133 144 }
134 145 }
135 146 } catch (e) {
... ... @@ -178,6 +189,12 @@ function parseQueueProperties() {
178 189 });
179 190 }
180 191
  192 +function sleep(ms) {
  193 + return new Promise((resolve) => {
  194 + setTimeout(resolve, ms);
  195 + });
  196 +}
  197 +
181 198 process.on('exit', () => {
182 199 stopped = true;
183 200 logger.info('Aws Sqs client stopped.');
... ...
... ... @@ -27,22 +27,22 @@ const vhost = config.get('rabbitmq.virtual_host');
27 27 const username = config.get('rabbitmq.username');
28 28 const password = config.get('rabbitmq.password');
29 29 const queueProperties = config.get('rabbitmq.queue_properties');
30   -const poolInterval = config.get('js.response_poll_interval');
  30 +const pollInterval = config.get('js.response_poll_interval');
31 31
32 32 const amqp = require('amqplib/callback_api');
33 33
34   -let queueParams = {durable: false, exclusive: false, autoDelete: false};
  34 +let queueOptions = {durable: false, exclusive: false, autoDelete: false};
35 35 let connection;
36 36 let channel;
37 37 let stopped = false;
38   -const responseTopics = [];
  38 +let queues = [];
39 39
40 40 function RabbitMqProducer() {
41 41 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
42 42
43   - if (!responseTopics.includes(responseTopic)) {
  43 + if (!queues.includes(responseTopic)) {
44 44 await createQueue(responseTopic);
45   - responseTopics.push(responseTopic);
  45 + queues.push(responseTopic);
46 46 }
47 47
48 48 let data = JSON.stringify(
... ... @@ -98,6 +98,7 @@ function RabbitMqProducer() {
98 98 const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
99 99
100 100 while (!stopped) {
  101 + let pollStartTs = new Date().getTime();
101 102 let message = await new Promise((resolve, reject) => {
102 103 channel.get(requestTopic, {}, function (err, msg) {
103 104 if (err) {
... ... @@ -112,7 +113,10 @@ function RabbitMqProducer() {
112 113 messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
113 114 channel.ack(message);
114 115 } else {
115   - await sleep(poolInterval);
  116 + let pollDuration = new Date().getTime() - pollStartTs;
  117 + if (pollDuration < pollInterval) {
  118 + await sleep(pollInterval - pollDuration);
  119 + }
116 120 }
117 121 }
118 122 } catch (e) {
... ... @@ -123,16 +127,18 @@ function RabbitMqProducer() {
123 127 })();
124 128
125 129 function parseQueueProperties() {
  130 + let args = {};
126 131 const props = queueProperties.split(';');
127 132 props.forEach(p => {
128 133 const delimiterPosition = p.indexOf(':');
129   - queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  134 + args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1);
130 135 });
  136 + queueOptions['arguments'] = args;
131 137 }
132 138
133   -function createQueue(topic) {
  139 +async function createQueue(topic) {
134 140 return new Promise((resolve, reject) => {
135   - channel.assertQueue(topic, queueParams, function (err) {
  141 + channel.assertQueue(topic, queueOptions, function (err) {
136 142 if (err) {
137 143 reject(err);
138 144 } else {
... ...
... ... @@ -140,6 +140,7 @@ function parseQueueProperties() {
140 140 properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
141 141 });
142 142 queueOptions = {
  143 + DuplicateDetection: 'false',
143 144 MaxSizeInMegabytes: properties['maxSizeInMb'],
144 145 DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
145 146 LockDuration: `PT${properties['lockDurationInSec']}S`
... ...