Commit c1b5fd5c57de91fdc59ab56100b03465b85837d4

Authored by Andrew Shvayka
Committed by GitHub
2 parents 8b9bb6eb ce01e329

Merge pull request #2714 from thingsboard/feature/queue-consumers-refactoring

Feature/queue consumers refactoring
Showing 33 changed files with 513 additions and 504 deletions
@@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -23,7 +23,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
23 import org.thingsboard.server.actors.ActorSystemContext; 23 import org.thingsboard.server.actors.ActorSystemContext;
24 import org.thingsboard.server.common.msg.queue.ServiceType; 24 import org.thingsboard.server.common.msg.queue.ServiceType;
25 import org.thingsboard.server.common.msg.queue.TbCallback; 25 import org.thingsboard.server.common.msg.queue.TbCallback;
26 -import org.thingsboard.server.gen.transport.TransportProtos;  
27 import org.thingsboard.server.queue.TbQueueConsumer; 26 import org.thingsboard.server.queue.TbQueueConsumer;
28 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 27 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
29 import org.thingsboard.server.queue.discovery.PartitionChangeEvent; 28 import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap; @@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
23 import java.util.Map; 23 import java.util.Map;
24 import java.util.UUID; 24 import java.util.UUID;
25 import java.util.concurrent.ConcurrentMap; 25 import java.util.concurrent.ConcurrentMap;
26 -import java.util.concurrent.ExecutorService;  
27 import java.util.concurrent.atomic.AtomicInteger; 26 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.BiConsumer; 27 import java.util.function.BiConsumer;
29 28
@@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @@ -77,8 +76,8 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
77 } 76 }
78 } 77 }
79 int submitSize = pendingPack.size(); 78 int submitSize = pendingPack.size();
80 - if (log.isInfoEnabled() && submitSize > 0) {  
81 - log.info("[{}] submitting [{}] messages to rule engine", queueName, submitSize); 79 + if (log.isDebugEnabled() && submitSize > 0) {
  80 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize);
82 } 81 }
83 pendingPack.forEach(msgConsumer); 82 pendingPack.forEach(msgConsumer);
84 } 83 }
@@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j; @@ -19,14 +19,8 @@ import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.gen.transport.TransportProtos; 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22 -import java.util.ArrayList;  
23 -import java.util.List;  
24 import java.util.UUID; 22 import java.util.UUID;
25 -import java.util.concurrent.ConcurrentMap;  
26 -import java.util.concurrent.ExecutorService;  
27 import java.util.function.BiConsumer; 23 import java.util.function.BiConsumer;
28 -import java.util.function.Function;  
29 -import java.util.stream.Collectors;  
30 24
31 @Slf4j 25 @Slf4j
32 public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { 26 public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
@@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS @@ -37,8 +31,8 @@ public class BurstTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS
37 31
38 @Override 32 @Override
39 public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) { 33 public void submitAttempt(BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer) {
40 - if (log.isInfoEnabled()) {  
41 - log.info("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size()); 34 + if (log.isDebugEnabled()) {
  35 + log.debug("[{}] submitting [{}] messages to rule engine", queueName, orderedMsgList.size());
42 } 36 }
43 orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg)); 37 orderedMsgList.forEach(pair -> msgConsumer.accept(pair.uuid, pair.msg));
44 } 38 }
@@ -15,26 +15,18 @@ @@ -15,26 +15,18 @@
15 */ 15 */
16 package org.thingsboard.server.service.queue.processing; 16 package org.thingsboard.server.service.queue.processing;
17 17
18 -import com.google.protobuf.InvalidProtocolBufferException;  
19 import lombok.extern.slf4j.Slf4j; 18 import lombok.extern.slf4j.Slf4j;
20 import org.thingsboard.server.common.data.id.EntityId; 19 import org.thingsboard.server.common.data.id.EntityId;
21 -import org.thingsboard.server.common.data.id.EntityIdFactory;  
22 -import org.thingsboard.server.common.msg.TbMsg;  
23 -import org.thingsboard.server.common.msg.gen.MsgProtos;  
24 -import org.thingsboard.server.common.msg.queue.TbMsgCallback;  
25 import org.thingsboard.server.gen.transport.TransportProtos; 20 import org.thingsboard.server.gen.transport.TransportProtos;
26 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 21 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
27 22
28 -import java.util.ArrayList;  
29 import java.util.LinkedList; 23 import java.util.LinkedList;
30 import java.util.List; 24 import java.util.List;
31 import java.util.Queue; 25 import java.util.Queue;
32 import java.util.UUID; 26 import java.util.UUID;
33 import java.util.concurrent.ConcurrentHashMap; 27 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap; 28 import java.util.concurrent.ConcurrentMap;
35 -import java.util.concurrent.atomic.AtomicInteger;  
36 import java.util.function.BiConsumer; 29 import java.util.function.BiConsumer;
37 -import java.util.stream.Collectors;  
38 30
39 @Slf4j 31 @Slf4j
40 public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy { 32 public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitStrategy {
@@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy @@ -30,6 +30,5 @@ public class SequentialByTenantIdTbRuleEngineSubmitStrategy extends SequentialBy
30 @Override 30 @Override
31 protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) { 31 protected EntityId getEntityId(TransportProtos.ToRuleEngineMsg msg) {
32 return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); 32 return new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
33 -  
34 } 33 }
35 } 34 }
@@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j; @@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
19 import org.thingsboard.server.gen.transport.TransportProtos; 19 import org.thingsboard.server.gen.transport.TransportProtos;
20 import org.thingsboard.server.queue.common.TbProtoQueueMsg; 20 import org.thingsboard.server.queue.common.TbProtoQueueMsg;
21 21
22 -import java.util.LinkedHashMap;  
23 -import java.util.Map;  
24 import java.util.UUID; 22 import java.util.UUID;
25 import java.util.concurrent.ConcurrentMap; 23 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.atomic.AtomicInteger; 24 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu @@ -63,8 +61,8 @@ public class SequentialTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSu
63 if (idx < listSize) { 61 if (idx < listSize) {
64 IdMsgPair pair = orderedMsgList.get(idx); 62 IdMsgPair pair = orderedMsgList.get(idx);
65 expectedMsgId = pair.uuid; 63 expectedMsgId = pair.uuid;
66 - if (log.isInfoEnabled()) {  
67 - log.info("[{}] submitting [{}] message to rule engine", queueName, pair.msg); 64 + if (log.isDebugEnabled()) {
  65 + log.debug("[{}] submitting [{}] message to rule engine", queueName, pair.msg);
68 } 66 }
69 msgConsumer.accept(pair.uuid, pair.msg); 67 msgConsumer.accept(pair.uuid, pair.msg);
70 } 68 }
@@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -82,10 +82,10 @@ public class TbRuleEngineProcessingStrategyFactory {
82 retryCount++; 82 retryCount++;
83 double failedCount = result.getFailedMap().size() + result.getPendingMap().size(); 83 double failedCount = result.getFailedMap().size() + result.getPendingMap().size();
84 if (maxRetries > 0 && retryCount > maxRetries) { 84 if (maxRetries > 0 && retryCount > maxRetries) {
85 - log.info("[{}] Skip reprocess of the rule engine pack due to max retries", queueName); 85 + log.debug("[{}] Skip reprocess of the rule engine pack due to max retries", queueName);
86 return new TbRuleEngineProcessingDecision(true, null); 86 return new TbRuleEngineProcessingDecision(true, null);
87 } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) { 87 } else if (maxAllowedFailurePercentage > 0 && (failedCount / initialTotalCount) > maxAllowedFailurePercentage) {
88 - log.info("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName); 88 + log.debug("[{}] Skip reprocess of the rule engine pack due to max allowed failure percentage", queueName);
89 return new TbRuleEngineProcessingDecision(true, null); 89 return new TbRuleEngineProcessingDecision(true, null);
90 } else { 90 } else {
91 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount); 91 ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> toReprocess = new ConcurrentHashMap<>(initialTotalCount);
@@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -98,7 +98,7 @@ public class TbRuleEngineProcessingStrategyFactory {
98 if (retrySuccessful) { 98 if (retrySuccessful) {
99 result.getSuccessMap().forEach(toReprocess::put); 99 result.getSuccessMap().forEach(toReprocess::put);
100 } 100 }
101 - log.info("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); 101 + log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
102 if (log.isTraceEnabled()) { 102 if (log.isTraceEnabled()) {
103 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); 103 toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
104 } 104 }
@@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory { @@ -126,7 +126,7 @@ public class TbRuleEngineProcessingStrategyFactory {
126 @Override 126 @Override
127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) { 127 public TbRuleEngineProcessingDecision analyze(TbRuleEngineProcessingResult result) {
128 if (!result.isSuccess()) { 128 if (!result.isSuccess()) {
129 - log.info("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); 129 + log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
130 } 130 }
131 if (log.isTraceEnabled()) { 131 if (log.isTraceEnabled()) {
132 result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); 132 result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
@@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { @@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
67 67
68 try { 68 try {
69 QueueDescription queueDescription = new QueueDescription(topic); 69 QueueDescription queueDescription = new QueueDescription(topic);
  70 + queueDescription.setRequiresDuplicateDetection(false);
70 setQueueConfigs(queueDescription); 71 setQueueConfigs(queueDescription);
71 72
72 client.createQueue(queueDescription); 73 client.createQueue(queueDescription);
@@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; @@ -31,9 +31,9 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
31 import org.springframework.util.CollectionUtils; 31 import org.springframework.util.CollectionUtils;
32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 32 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
33 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
34 -import org.thingsboard.server.queue.TbQueueConsumer;  
35 import org.thingsboard.server.queue.TbQueueMsg; 34 import org.thingsboard.server.queue.TbQueueMsg;
36 import org.thingsboard.server.queue.TbQueueMsgDecoder; 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
38 38
39 import java.time.Duration; 39 import java.time.Duration;
@@ -50,102 +50,72 @@ import java.util.stream.Collectors; @@ -50,102 +50,72 @@ import java.util.stream.Collectors;
50 import java.util.stream.Stream; 50 import java.util.stream.Stream;
51 51
52 @Slf4j 52 @Slf4j
53 -public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 53 +public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<MessageWithDeliveryTag, T> {
54 private final TbQueueAdmin admin; 54 private final TbQueueAdmin admin;
55 - private final String topic;  
56 private final TbQueueMsgDecoder<T> decoder; 55 private final TbQueueMsgDecoder<T> decoder;
57 private final TbServiceBusSettings serviceBusSettings; 56 private final TbServiceBusSettings serviceBusSettings;
58 57
59 private final Gson gson = new Gson(); 58 private final Gson gson = new Gson();
60 59
61 private Set<CoreMessageReceiver> receivers; 60 private Set<CoreMessageReceiver> receivers;
62 - private volatile Set<TopicPartitionInfo> partitions;  
63 - private volatile boolean subscribed;  
64 - private volatile boolean stopped = false;  
65 - private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>(); 61 + private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
66 private volatile int messagesPerQueue; 62 private volatile int messagesPerQueue;
67 63
68 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) { 64 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
69 this.admin = admin; 66 this.admin = admin;
70 this.decoder = decoder; 67 this.decoder = decoder;
71 - this.topic = topic;  
72 this.serviceBusSettings = serviceBusSettings; 68 this.serviceBusSettings = serviceBusSettings;
73 } 69 }
74 70
75 @Override 71 @Override
76 - public String getTopic() {  
77 - return topic; 72 + protected List<MessageWithDeliveryTag> doPoll(long durationInMillis) {
  73 + List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =
  74 + receivers.stream()
  75 + .map(receiver -> receiver
  76 + .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))
  77 + .whenComplete((messages, err) -> {
  78 + if (!CollectionUtils.isEmpty(messages)) {
  79 + pendingMessages.put(receiver, messages);
  80 + } else if (err != null) {
  81 + log.error("Failed to receive messages.", err);
  82 + }
  83 + }))
  84 + .collect(Collectors.toList());
  85 + try {
  86 + return fromList(messageFutures)
  87 + .get()
  88 + .stream()
  89 + .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())
  90 + .collect(Collectors.toList());
  91 + } catch (InterruptedException | ExecutionException e) {
  92 + if (stopped) {
  93 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
  94 + } else {
  95 + log.error("Failed to receive messages", e);
  96 + }
  97 + return Collections.emptyList();
  98 + }
78 } 99 }
79 100
80 @Override 101 @Override
81 - public void subscribe() {  
82 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
83 - subscribed = false; 102 + protected void doSubscribe(List<String> topicNames) {
  103 + createReceivers();
  104 + messagesPerQueue = receivers.size() / partitions.size();
84 } 105 }
85 106
86 @Override 107 @Override
87 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
88 - this.partitions = partitions;  
89 - subscribed = false; 108 + protected void doCommit() {
  109 + pendingMessages.forEach((receiver, msgs) ->
  110 + msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));
  111 + pendingMessages.clear();
90 } 112 }
91 113
92 @Override 114 @Override
93 - public void unsubscribe() {  
94 - stopped = true; 115 + protected void doUnsubscribe() {
95 receivers.forEach(CoreMessageReceiver::closeAsync); 116 receivers.forEach(CoreMessageReceiver::closeAsync);
96 } 117 }
97 118
98 - @Override  
99 - public List<T> poll(long durationInMillis) {  
100 - if (!subscribed && partitions == null) {  
101 - try {  
102 - Thread.sleep(durationInMillis);  
103 - } catch (InterruptedException e) {  
104 - log.debug("Failed to await subscription", e);  
105 - }  
106 - } else {  
107 - if (!subscribed) {  
108 - createReceivers();  
109 - messagesPerQueue = receivers.size() / partitions.size();  
110 - subscribed = true;  
111 - }  
112 -  
113 - List<CompletableFuture<Collection<MessageWithDeliveryTag>>> messageFutures =  
114 - receivers.stream()  
115 - .map(receiver -> receiver  
116 - .receiveAsync(messagesPerQueue, Duration.ofMillis(durationInMillis))  
117 - .whenComplete((messages, err) -> {  
118 - if (!CollectionUtils.isEmpty(messages)) {  
119 - pendingMessages.put(receiver, messages);  
120 - } else if (err != null) {  
121 - log.error("Failed to receive messages.", err);  
122 - }  
123 - }))  
124 - .collect(Collectors.toList());  
125 - try {  
126 - return fromList(messageFutures)  
127 - .get()  
128 - .stream()  
129 - .flatMap(messages -> CollectionUtils.isEmpty(messages) ? Stream.empty() : messages.stream())  
130 - .map(message -> {  
131 - try {  
132 - return decode(message);  
133 - } catch (InvalidProtocolBufferException e) {  
134 - log.error("Failed to parse message.", e);  
135 - throw new RuntimeException("Failed to parse message.", e);  
136 - }  
137 - }).collect(Collectors.toList());  
138 - } catch (InterruptedException | ExecutionException e) {  
139 - if (stopped) {  
140 - log.info("[{}] Service Bus consumer is stopped.", topic);  
141 - } else {  
142 - log.error("Failed to receive messages", e);  
143 - }  
144 - }  
145 - }  
146 - return Collections.emptyList();  
147 - }  
148 -  
149 private void createReceivers() { 119 private void createReceivers() {
150 List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream() 120 List<CompletableFuture<CoreMessageReceiver>> receiverFutures = partitions.stream()
151 .map(TopicPartitionInfo::getFullTopicName) 121 .map(TopicPartitionInfo::getFullTopicName)
@@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue @@ -167,7 +137,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
167 receivers = new HashSet<>(fromList(receiverFutures).get()); 137 receivers = new HashSet<>(fromList(receiverFutures).get());
168 } catch (InterruptedException | ExecutionException e) { 138 } catch (InterruptedException | ExecutionException e) {
169 if (stopped) { 139 if (stopped) {
170 - log.info("[{}] Service Bus consumer is stopped.", topic); 140 + log.info("[{}] Service Bus consumer is stopped.", getTopic());
171 } else { 141 } else {
172 log.error("Failed to create receivers", e); 142 log.error("Failed to create receivers", e);
173 } 143 }
@@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue @@ -196,13 +166,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> implements TbQue
196 } 166 }
197 167
198 @Override 168 @Override
199 - public void commit() {  
200 - pendingMessages.forEach((receiver, msgs) ->  
201 - msgs.forEach(msg -> receiver.completeMessageAsync(msg.getDeliveryTag(), TransactionContext.NULL_TXN)));  
202 - pendingMessages.clear();  
203 - }  
204 -  
205 - private T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException { 169 + protected T decode(MessageWithDeliveryTag data) throws InvalidProtocolBufferException {
206 DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class); 170 DefaultTbQueueMsg msg = gson.fromJson(new String(((Data) data.getMessage().getBody()).getValue().getArray()), DefaultTbQueueMsg.class);
207 return decoder.decode(msg); 171 return decoder.decode(msg);
208 } 172 }
@@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg; @@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
33 import java.util.HashMap; 33 import java.util.HashMap;
34 import java.util.Map; 34 import java.util.Map;
35 import java.util.concurrent.CompletableFuture; 35 import java.util.concurrent.CompletableFuture;
  36 +import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService; 37 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors; 38 import java.util.concurrent.Executors;
38 39
@@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQue @@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQue
42 private final Gson gson = new Gson(); 43 private final Gson gson = new Gson();
43 private final TbQueueAdmin admin; 44 private final TbQueueAdmin admin;
44 private final TbServiceBusSettings serviceBusSettings; 45 private final TbServiceBusSettings serviceBusSettings;
45 - private final Map<String, QueueClient> clients = new HashMap<>();  
46 - private ExecutorService executorService; 46 + private final Map<String, QueueClient> clients = new ConcurrentHashMap<>();
  47 + private final ExecutorService executorService;
47 48
48 public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) { 49 public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
49 this.admin = admin; 50 this.admin = admin;
50 this.defaultTopic = defaultTopic; 51 this.defaultTopic = defaultTopic;
51 this.serviceBusSettings = serviceBusSettings; 52 this.serviceBusSettings = serviceBusSettings;
52 - executorService = Executors.newSingleThreadExecutor(); 53 + executorService = Executors.newCachedThreadPool();
53 } 54 }
54 55
55 @Override 56 @Override
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import com.google.common.util.concurrent.ListeningExecutorService;
  19 +import com.google.common.util.concurrent.MoreExecutors;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.thingsboard.server.queue.TbQueueMsg;
  22 +
  23 +import java.util.concurrent.Executors;
  24 +import java.util.concurrent.TimeUnit;
  25 +
  26 +@Slf4j
  27 +public abstract class AbstractParallelTbQueueConsumerTemplate<R, T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<R, T> {
  28 +
  29 + protected ListeningExecutorService consumerExecutor;
  30 +
  31 + public AbstractParallelTbQueueConsumerTemplate(String topic) {
  32 + super(topic);
  33 + }
  34 +
  35 + protected void initNewExecutor(int threadPoolSize) {
  36 + if (consumerExecutor != null) {
  37 + consumerExecutor.shutdown();
  38 + try {
  39 + consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
  40 + } catch (InterruptedException e) {
  41 + log.trace("Interrupted while waiting for consumer executor to stop");
  42 + }
  43 + }
  44 + consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolSize));
  45 + }
  46 +
  47 + protected void shutdownExecutor() {
  48 + if (consumerExecutor != null) {
  49 + consumerExecutor.shutdownNow();
  50 + }
  51 + }
  52 +
  53 +}
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.queue.common;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
  21 +import org.thingsboard.server.queue.TbQueueConsumer;
  22 +import org.thingsboard.server.queue.TbQueueMsg;
  23 +
  24 +import java.io.IOException;
  25 +import java.util.ArrayList;
  26 +import java.util.Collections;
  27 +import java.util.List;
  28 +import java.util.Set;
  29 +import java.util.concurrent.locks.Lock;
  30 +import java.util.concurrent.locks.ReentrantLock;
  31 +import java.util.stream.Collectors;
  32 +
  33 +@Slf4j
  34 +public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> implements TbQueueConsumer<T> {
  35 +
  36 + private volatile boolean subscribed;
  37 + protected volatile boolean stopped = false;
  38 + protected volatile Set<TopicPartitionInfo> partitions;
  39 + protected final Lock consumerLock = new ReentrantLock();
  40 +
  41 + @Getter
  42 + private final String topic;
  43 +
  44 + public AbstractTbQueueConsumerTemplate(String topic) {
  45 + this.topic = topic;
  46 + }
  47 +
  48 + @Override
  49 + public void subscribe() {
  50 + consumerLock.lock();
  51 + try {
  52 + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));
  53 + subscribed = false;
  54 + } finally {
  55 + consumerLock.unlock();
  56 + }
  57 + }
  58 +
  59 + @Override
  60 + public void subscribe(Set<TopicPartitionInfo> partitions) {
  61 + consumerLock.lock();
  62 + try {
  63 + this.partitions = partitions;
  64 + subscribed = false;
  65 + } finally {
  66 + consumerLock.unlock();
  67 + }
  68 + }
  69 +
  70 + @Override
  71 + public List<T> poll(long durationInMillis) {
  72 + if (!subscribed && partitions == null) {
  73 + try {
  74 + Thread.sleep(durationInMillis);
  75 + } catch (InterruptedException e) {
  76 + log.debug("Failed to await subscription", e);
  77 + }
  78 + } else {
  79 + long pollStartTs = System.currentTimeMillis();
  80 + consumerLock.lock();
  81 + try {
  82 + if (!subscribed) {
  83 + List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
  84 + doSubscribe(topicNames);
  85 + subscribed = true;
  86 + }
  87 +
  88 + List<R> records = doPoll(durationInMillis);
  89 + if (!records.isEmpty()) {
  90 + List<T> result = new ArrayList<>(records.size());
  91 + records.forEach(record -> {
  92 + try {
  93 + if (record != null) {
  94 + result.add(decode(record));
  95 + }
  96 + } catch (IOException e) {
  97 + log.error("Failed decode record: [{}]", record);
  98 + throw new RuntimeException("Failed to decode record: ", e);
  99 + }
  100 + });
  101 + return result;
  102 + } else {
  103 + long pollDuration = System.currentTimeMillis() - pollStartTs;
  104 + if (pollDuration < durationInMillis) {
  105 + try {
  106 + Thread.sleep(durationInMillis - pollDuration);
  107 + } catch (InterruptedException e) {
  108 + if (!stopped) {
  109 + log.error("Failed to wait.", e);
  110 + }
  111 + }
  112 + }
  113 + }
  114 + } finally {
  115 + consumerLock.unlock();
  116 + }
  117 + }
  118 + return Collections.emptyList();
  119 + }
  120 +
  121 + @Override
  122 + public void commit() {
  123 + consumerLock.lock();
  124 + try {
  125 + doCommit();
  126 + } finally {
  127 + consumerLock.unlock();
  128 + }
  129 + }
  130 +
  131 + @Override
  132 + public void unsubscribe() {
  133 + stopped = true;
  134 + consumerLock.lock();
  135 + try {
  136 + doUnsubscribe();
  137 + } finally {
  138 + consumerLock.unlock();
  139 + }
  140 + }
  141 +
  142 + abstract protected List<R> doPoll(long durationInMillis);
  143 +
  144 + abstract protected T decode(R record) throws IOException;
  145 +
  146 + abstract protected void doSubscribe(List<String> topicNames);
  147 +
  148 + abstract protected void doCommit();
  149 +
  150 + abstract protected void doUnsubscribe();
  151 +
  152 +}
@@ -16,16 +16,14 @@ @@ -16,16 +16,14 @@
16 package org.thingsboard.server.queue.kafka; 16 package org.thingsboard.server.queue.kafka;
17 17
18 import lombok.Builder; 18 import lombok.Builder;
19 -import lombok.Getter;  
20 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
21 import org.apache.kafka.clients.consumer.ConsumerConfig; 20 import org.apache.kafka.clients.consumer.ConsumerConfig;
22 import org.apache.kafka.clients.consumer.ConsumerRecord; 21 import org.apache.kafka.clients.consumer.ConsumerRecord;
23 import org.apache.kafka.clients.consumer.ConsumerRecords; 22 import org.apache.kafka.clients.consumer.ConsumerRecords;
24 import org.apache.kafka.clients.consumer.KafkaConsumer; 23 import org.apache.kafka.clients.consumer.KafkaConsumer;
25 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
26 import org.thingsboard.server.queue.TbQueueAdmin; 24 import org.thingsboard.server.queue.TbQueueAdmin;
27 -import org.thingsboard.server.queue.TbQueueConsumer;  
28 import org.thingsboard.server.queue.TbQueueMsg; 25 import org.thingsboard.server.queue.TbQueueMsg;
  26 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 27
30 import java.io.IOException; 28 import java.io.IOException;
31 import java.time.Duration; 29 import java.time.Duration;
@@ -33,26 +31,16 @@ import java.util.ArrayList; @@ -33,26 +31,16 @@ import java.util.ArrayList;
33 import java.util.Collections; 31 import java.util.Collections;
34 import java.util.List; 32 import java.util.List;
35 import java.util.Properties; 33 import java.util.Properties;
36 -import java.util.Set;  
37 -import java.util.concurrent.locks.Lock;  
38 -import java.util.concurrent.locks.ReentrantLock;  
39 -import java.util.stream.Collectors;  
40 34
41 /** 35 /**
42 * Created by ashvayka on 24.09.18. 36 * Created by ashvayka on 24.09.18.
43 */ 37 */
44 @Slf4j 38 @Slf4j
45 -public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 39 +public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
46 40
47 private final TbQueueAdmin admin; 41 private final TbQueueAdmin admin;
48 private final KafkaConsumer<String, byte[]> consumer; 42 private final KafkaConsumer<String, byte[]> consumer;
49 private final TbKafkaDecoder<T> decoder; 43 private final TbKafkaDecoder<T> decoder;
50 - private volatile boolean subscribed;  
51 - private volatile Set<TopicPartitionInfo> partitions;  
52 - private final Lock consumerLock;  
53 -  
54 - @Getter  
55 - private final String topic;  
56 44
57 @Builder 45 @Builder
58 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, 46 private TbKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
@@ -60,6 +48,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -60,6 +48,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
60 boolean autoCommit, int autoCommitIntervalMs, 48 boolean autoCommit, int autoCommitIntervalMs,
61 int maxPollRecords, 49 int maxPollRecords,
62 TbQueueAdmin admin) { 50 TbQueueAdmin admin) {
  51 + super(topic);
63 Properties props = settings.toProps(); 52 Properties props = settings.toProps();
64 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 53 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
65 if (groupId != null) { 54 if (groupId != null) {
@@ -75,94 +64,42 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon @@ -75,94 +64,42 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> implements TbQueueCon
75 this.admin = admin; 64 this.admin = admin;
76 this.consumer = new KafkaConsumer<>(props); 65 this.consumer = new KafkaConsumer<>(props);
77 this.decoder = decoder; 66 this.decoder = decoder;
78 - this.topic = topic;  
79 - this.consumerLock = new ReentrantLock();  
80 } 67 }
81 68
82 @Override 69 @Override
83 - public void subscribe() {  
84 - consumerLock.lock();  
85 - try {  
86 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
87 - subscribed = false;  
88 - } finally {  
89 - consumerLock.unlock();  
90 - } 70 + protected void doSubscribe(List<String> topicNames) {
  71 + topicNames.forEach(admin::createTopicIfNotExists);
  72 + consumer.subscribe(topicNames);
91 } 73 }
92 74
93 @Override 75 @Override
94 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
95 - consumerLock.lock();  
96 - try {  
97 - this.partitions = partitions;  
98 - subscribed = false;  
99 - } finally {  
100 - consumerLock.unlock(); 76 + protected List<ConsumerRecord<String, byte[]>> doPoll(long durationInMillis) {
  77 + ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));
  78 + if (records.isEmpty()) {
  79 + return Collections.emptyList();
  80 + } else {
  81 + List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
  82 + records.forEach(recordList::add);
  83 + return recordList;
101 } 84 }
102 } 85 }
103 86
104 @Override 87 @Override
105 - public List<T> poll(long durationInMillis) {  
106 - if (!subscribed && partitions == null) {  
107 - try {  
108 - Thread.sleep(durationInMillis);  
109 - } catch (InterruptedException e) {  
110 - log.debug("Failed to await subscription", e);  
111 - }  
112 - } else {  
113 - consumerLock.lock();  
114 - try {  
115 - if (!subscribed) {  
116 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());  
117 - topicNames.forEach(admin::createTopicIfNotExists);  
118 - consumer.subscribe(topicNames);  
119 - subscribed = true;  
120 - }  
121 -  
122 - ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(durationInMillis));  
123 - if (records.count() > 0) {  
124 - List<T> result = new ArrayList<>();  
125 - records.forEach(record -> {  
126 - try {  
127 - result.add(decode(record));  
128 - } catch (IOException e) {  
129 - log.error("Failed decode record: [{}]", record);  
130 - }  
131 - });  
132 - return result;  
133 - }  
134 - } finally {  
135 - consumerLock.unlock();  
136 - }  
137 - }  
138 - return Collections.emptyList(); 88 + public T decode(ConsumerRecord<String, byte[]> record) throws IOException {
  89 + return decoder.decode(new KafkaTbQueueMsg(record));
139 } 90 }
140 91
141 @Override 92 @Override
142 - public void commit() {  
143 - consumerLock.lock();  
144 - try {  
145 - consumer.commitAsync();  
146 - } finally {  
147 - consumerLock.unlock();  
148 - } 93 + protected void doCommit() {
  94 + consumer.commitAsync();
149 } 95 }
150 96
151 @Override 97 @Override
152 - public void unsubscribe() {  
153 - consumerLock.lock();  
154 - try {  
155 - if (consumer != null) {  
156 - consumer.unsubscribe();  
157 - consumer.close();  
158 - }  
159 - } finally {  
160 - consumerLock.unlock(); 98 + protected void doUnsubscribe() {
  99 + if (consumer != null) {
  100 + consumer.unsubscribe();
  101 + consumer.close();
161 } 102 }
162 } 103 }
163 104
164 - public T decode(ConsumerRecord<String, byte[]> record) throws IOException {  
165 - return decoder.decode(new KafkaTbQueueMsg(record));  
166 - }  
167 -  
168 } 105 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.queue.pubsub; 16 package org.thingsboard.server.queue.pubsub;
17 17
  18 +import com.google.api.gax.rpc.AlreadyExistsException;
18 import com.google.cloud.pubsub.v1.SubscriptionAdminClient; 19 import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
19 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; 20 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
20 import com.google.cloud.pubsub.v1.TopicAdminClient; 21 import com.google.cloud.pubsub.v1.TopicAdminClient;
@@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest; @@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest;
24 import com.google.pubsub.v1.ListTopicsRequest; 25 import com.google.pubsub.v1.ListTopicsRequest;
25 import com.google.pubsub.v1.ProjectName; 26 import com.google.pubsub.v1.ProjectName;
26 import com.google.pubsub.v1.ProjectSubscriptionName; 27 import com.google.pubsub.v1.ProjectSubscriptionName;
27 -import com.google.pubsub.v1.ProjectTopicName;  
28 import com.google.pubsub.v1.Subscription; 28 import com.google.pubsub.v1.Subscription;
29 import com.google.pubsub.v1.Topic; 29 import com.google.pubsub.v1.Topic;
  30 +import com.google.pubsub.v1.TopicName;
30 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
31 import org.thingsboard.server.queue.TbQueueAdmin; 32 import org.thingsboard.server.queue.TbQueueAdmin;
32 33
@@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin {
103 104
104 @Override 105 @Override
105 public void createTopicIfNotExists(String partition) { 106 public void createTopicIfNotExists(String partition) {
106 - ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), partition); 107 + TopicName topicName = TopicName.newBuilder()
  108 + .setTopic(partition)
  109 + .setProject(pubSubSettings.getProjectId())
  110 + .build();
107 111
108 if (topicSet.contains(topicName.toString())) { 112 if (topicSet.contains(topicName.toString())) {
109 createSubscriptionIfNotExists(partition, topicName); 113 createSubscriptionIfNotExists(partition, topicName);
@@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin {
121 } 125 }
122 } 126 }
123 127
124 - topicAdminClient.createTopic(topicName);  
125 - topicSet.add(topicName.toString());  
126 - log.info("Created new topic: [{}]", topicName.toString()); 128 + try {
  129 + topicAdminClient.createTopic(topicName);
  130 + log.info("Created new topic: [{}]", topicName.toString());
  131 + } catch (AlreadyExistsException e) {
  132 + log.info("[{}] Topic already exist.", topicName.toString());
  133 + } finally {
  134 + topicSet.add(topicName.toString());
  135 + }
127 createSubscriptionIfNotExists(partition, topicName); 136 createSubscriptionIfNotExists(partition, topicName);
128 } 137 }
129 138
130 - private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) { 139 + private void createSubscriptionIfNotExists(String partition, TopicName topicName) {
131 ProjectSubscriptionName subscriptionName = 140 ProjectSubscriptionName subscriptionName =
132 ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition); 141 ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition);
133 142
@@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin { @@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin {
153 setAckDeadline(subscriptionBuilder); 162 setAckDeadline(subscriptionBuilder);
154 setMessageRetention(subscriptionBuilder); 163 setMessageRetention(subscriptionBuilder);
155 164
156 - subscriptionAdminClient.createSubscription(subscriptionBuilder.build());  
157 - subscriptionSet.add(subscriptionName.toString());  
158 - log.info("Created new subscription: [{}]", subscriptionName.toString()); 165 + try {
  166 + subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
  167 + log.info("Created new subscription: [{}]", subscriptionName.toString());
  168 + } catch (AlreadyExistsException e) {
  169 + log.info("[{}] Subscription already exist.", subscriptionName.toString());
  170 + } finally {
  171 + subscriptionSet.add(subscriptionName.toString());
  172 + }
159 } 173 }
160 174
161 private void setAckDeadline(Subscription.Builder builder) { 175 private void setAckDeadline(Subscription.Builder builder) {
@@ -30,27 +30,25 @@ import com.google.pubsub.v1.PullResponse; @@ -30,27 +30,25 @@ import com.google.pubsub.v1.PullResponse;
30 import com.google.pubsub.v1.ReceivedMessage; 30 import com.google.pubsub.v1.ReceivedMessage;
31 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
32 import org.springframework.util.CollectionUtils; 32 import org.springframework.util.CollectionUtils;
33 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
34 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
35 -import org.thingsboard.server.queue.TbQueueConsumer;  
36 import org.thingsboard.server.queue.TbQueueMsg; 34 import org.thingsboard.server.queue.TbQueueMsg;
37 import org.thingsboard.server.queue.TbQueueMsgDecoder; 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
38 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
39 38
40 import java.io.IOException; 39 import java.io.IOException;
41 import java.util.ArrayList; 40 import java.util.ArrayList;
42 import java.util.Collections; 41 import java.util.Collections;
  42 +import java.util.LinkedHashSet;
43 import java.util.List; 43 import java.util.List;
44 import java.util.Objects; 44 import java.util.Objects;
45 import java.util.Set; 45 import java.util.Set;
46 import java.util.concurrent.CopyOnWriteArrayList; 46 import java.util.concurrent.CopyOnWriteArrayList;
47 import java.util.concurrent.ExecutionException; 47 import java.util.concurrent.ExecutionException;
48 -import java.util.concurrent.ExecutorService;  
49 -import java.util.concurrent.Executors;  
50 import java.util.stream.Collectors; 48 import java.util.stream.Collectors;
51 49
52 @Slf4j 50 @Slf4j
53 -public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 51 +public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<PubsubMessage, T> {
54 52
55 private final Gson gson = new Gson(); 53 private final Gson gson = new Gson();
56 private final TbQueueAdmin admin; 54 private final TbQueueAdmin admin;
@@ -58,23 +56,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -58,23 +56,18 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
58 private final TbQueueMsgDecoder<T> decoder; 56 private final TbQueueMsgDecoder<T> decoder;
59 private final TbPubSubSettings pubSubSettings; 57 private final TbPubSubSettings pubSubSettings;
60 58
61 - private volatile boolean subscribed;  
62 - private volatile Set<TopicPartitionInfo> partitions;  
63 private volatile Set<String> subscriptionNames; 59 private volatile Set<String> subscriptionNames;
64 private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>(); 60 private final List<AcknowledgeRequest> acknowledgeRequests = new CopyOnWriteArrayList<>();
65 61
66 - private ExecutorService consumerExecutor;  
67 private final SubscriberStub subscriber; 62 private final SubscriberStub subscriber;
68 - private volatile boolean stopped;  
69 -  
70 private volatile int messagesPerTopic; 63 private volatile int messagesPerTopic;
71 64
72 public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) { 65 public TbPubSubConsumerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  66 + super(topic);
73 this.admin = admin; 67 this.admin = admin;
74 this.pubSubSettings = pubSubSettings; 68 this.pubSubSettings = pubSubSettings;
75 this.topic = topic; 69 this.topic = topic;
76 this.decoder = decoder; 70 this.decoder = decoder;
77 -  
78 try { 71 try {
79 SubscriberStubSettings subscriberStubSettings = 72 SubscriberStubSettings subscriberStubSettings =
80 SubscriberStubSettings.newBuilder() 73 SubscriberStubSettings.newBuilder()
@@ -84,89 +77,50 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -84,89 +77,50 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
84 .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) 77 .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize())
85 .build()) 78 .build())
86 .build(); 79 .build();
87 -  
88 this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); 80 this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
89 } catch (IOException e) { 81 } catch (IOException e) {
90 log.error("Failed to create subscriber.", e); 82 log.error("Failed to create subscriber.", e);
91 throw new RuntimeException("Failed to create subscriber.", e); 83 throw new RuntimeException("Failed to create subscriber.", e);
92 } 84 }
93 - stopped = false;  
94 } 85 }
95 86
96 @Override 87 @Override
97 - public String getTopic() {  
98 - return topic; 88 + protected List<PubsubMessage> doPoll(long durationInMillis) {
  89 + try {
  90 + List<ReceivedMessage> messages = receiveMessages();
  91 + if (!messages.isEmpty()) {
  92 + return messages.stream().map(ReceivedMessage::getMessage).collect(Collectors.toList());
  93 + }
  94 + } catch (ExecutionException | InterruptedException e) {
  95 + if (stopped) {
  96 + log.info("[{}] Pub/Sub consumer is stopped.", topic);
  97 + } else {
  98 + log.error("Failed to receive messages", e);
  99 + }
  100 + }
  101 + return Collections.emptyList();
99 } 102 }
100 103
101 @Override 104 @Override
102 - public void subscribe() {  
103 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
104 - subscribed = false; 105 + protected void doSubscribe(List<String> topicNames) {
  106 + subscriptionNames = new LinkedHashSet<>(topicNames);
  107 + subscriptionNames.forEach(admin::createTopicIfNotExists);
  108 + initNewExecutor(subscriptionNames.size() + 1);
  109 + messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();
105 } 110 }
106 111
107 @Override 112 @Override
108 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
109 - this.partitions = partitions;  
110 - subscribed = false; 113 + protected void doCommit() {
  114 + acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);
  115 + acknowledgeRequests.clear();
111 } 116 }
112 117
113 @Override 118 @Override
114 - public void unsubscribe() {  
115 - stopped = true;  
116 - if (consumerExecutor != null) {  
117 - consumerExecutor.shutdownNow();  
118 - }  
119 - 119 + protected void doUnsubscribe() {
120 if (subscriber != null) { 120 if (subscriber != null) {
121 subscriber.close(); 121 subscriber.close();
122 } 122 }
123 - }  
124 -  
125 - @Override  
126 - public List<T> poll(long durationInMillis) {  
127 - if (!subscribed && partitions == null) {  
128 - try {  
129 - Thread.sleep(durationInMillis);  
130 - } catch (InterruptedException e) {  
131 - log.debug("Failed to await subscription", e);  
132 - }  
133 - } else {  
134 - if (!subscribed) {  
135 - subscriptionNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toSet());  
136 - subscriptionNames.forEach(admin::createTopicIfNotExists);  
137 - consumerExecutor = Executors.newFixedThreadPool(subscriptionNames.size());  
138 - messagesPerTopic = pubSubSettings.getMaxMessages() / subscriptionNames.size();  
139 - subscribed = true;  
140 - }  
141 - List<ReceivedMessage> messages;  
142 - try {  
143 - messages = receiveMessages();  
144 - if (!messages.isEmpty()) {  
145 - List<T> result = new ArrayList<>();  
146 - messages.forEach(msg -> {  
147 - try {  
148 - result.add(decode(msg.getMessage()));  
149 - } catch (InvalidProtocolBufferException e) {  
150 - log.error("Failed decode record: [{}]", msg);  
151 - }  
152 - });  
153 - return result;  
154 - }  
155 - } catch (ExecutionException | InterruptedException e) {  
156 - if (stopped) {  
157 - log.info("[{}] Pub/Sub consumer is stopped.", topic);  
158 - } else {  
159 - log.error("Failed to receive messages", e);  
160 - }  
161 - }  
162 - }  
163 - return Collections.emptyList();  
164 - }  
165 -  
166 - @Override  
167 - public void commit() {  
168 - acknowledgeRequests.forEach(subscriber.acknowledgeCallable()::futureCall);  
169 - acknowledgeRequests.clear(); 123 + shutdownExecutor();
170 } 124 }
171 125
172 private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException { 126 private List<ReceivedMessage> receiveMessages() throws ExecutionException, InterruptedException {
@@ -175,7 +129,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -175,7 +129,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
175 PullRequest pullRequest = 129 PullRequest pullRequest =
176 PullRequest.newBuilder() 130 PullRequest.newBuilder()
177 .setMaxMessages(messagesPerTopic) 131 .setMaxMessages(messagesPerTopic)
178 - .setReturnImmediately(false) // return immediately if messages are not available 132 +// .setReturnImmediately(false) // return immediately if messages are not available
179 .setSubscription(subscriptionName) 133 .setSubscription(subscriptionName)
180 .build(); 134 .build();
181 135
@@ -211,6 +165,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -211,6 +165,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
211 return transform.get(); 165 return transform.get();
212 } 166 }
213 167
  168 + @Override
214 public T decode(PubsubMessage message) throws InvalidProtocolBufferException { 169 public T decode(PubsubMessage message) throws InvalidProtocolBufferException {
215 DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class); 170 DefaultTbQueueMsg msg = gson.fromJson(message.getData().toStringUtf8(), DefaultTbQueueMsg.class);
216 return decoder.decode(msg); 171 return decoder.decode(msg);
@@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr @@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
49 49
50 private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>(); 50 private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
51 51
52 - private ExecutorService pubExecutor = Executors.newCachedThreadPool(); 52 + private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
53 53
54 public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) { 54 public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
55 this.defaultTopic = defaultTopic; 55 this.defaultTopic = defaultTopic;
@@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr @@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
124 publisherMap.put(topic, publisher); 124 publisherMap.put(topic, publisher);
125 return publisher; 125 return publisher;
126 } catch (IOException e) { 126 } catch (IOException e) {
127 - log.error("Failed to create topic [{}].", topic, e);  
128 - throw new RuntimeException("Failed to create topic.", e); 127 + log.error("Failed to create Publisher for the topic [{}].", topic, e);
  128 + throw new RuntimeException("Failed to create Publisher for the topic.", e);
129 } 129 }
130 } 130 }
131 131
@@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException; @@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException;
27 @Slf4j 27 @Slf4j
28 public class TbRabbitMqAdmin implements TbQueueAdmin { 28 public class TbRabbitMqAdmin implements TbQueueAdmin {
29 29
30 - private final TbRabbitMqSettings rabbitMqSettings;  
31 private final Channel channel; 30 private final Channel channel;
32 private final Connection connection; 31 private final Connection connection;
33 private final Map<String, Object> arguments; 32 private final Map<String, Object> arguments;
34 33
35 public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) { 34 public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) {
36 - this.rabbitMqSettings = rabbitMqSettings;  
37 this.arguments = arguments; 35 this.arguments = arguments;
38 36
39 try { 37 try {
@@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse; @@ -23,9 +23,9 @@ import com.rabbitmq.client.GetResponse;
23 import lombok.extern.slf4j.Slf4j; 23 import lombok.extern.slf4j.Slf4j;
24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 24 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
25 import org.thingsboard.server.queue.TbQueueAdmin; 25 import org.thingsboard.server.queue.TbQueueAdmin;
26 -import org.thingsboard.server.queue.TbQueueConsumer;  
27 import org.thingsboard.server.queue.TbQueueMsg; 26 import org.thingsboard.server.queue.TbQueueMsg;
28 import org.thingsboard.server.queue.TbQueueMsgDecoder; 27 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  28 +import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
29 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 29 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
30 30
31 import java.io.IOException; 31 import java.io.IOException;
@@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException; @@ -37,33 +37,26 @@ import java.util.concurrent.TimeoutException;
37 import java.util.stream.Collectors; 37 import java.util.stream.Collectors;
38 38
39 @Slf4j 39 @Slf4j
40 -public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 40 +public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<GetResponse, T> {
41 41
42 private final Gson gson = new Gson(); 42 private final Gson gson = new Gson();
43 private final TbQueueAdmin admin; 43 private final TbQueueAdmin admin;
44 - private final String topic;  
45 private final TbQueueMsgDecoder<T> decoder; 44 private final TbQueueMsgDecoder<T> decoder;
46 - private final TbRabbitMqSettings rabbitMqSettings;  
47 private final Channel channel; 45 private final Channel channel;
48 private final Connection connection; 46 private final Connection connection;
49 47
50 - private volatile Set<TopicPartitionInfo> partitions;  
51 - private volatile boolean subscribed;  
52 private volatile Set<String> queues; 48 private volatile Set<String> queues;
53 - private volatile boolean stopped;  
54 49
55 public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) { 50 public TbRabbitMqConsumerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  51 + super(topic);
56 this.admin = admin; 52 this.admin = admin;
57 this.decoder = decoder; 53 this.decoder = decoder;
58 - this.topic = topic;  
59 - this.rabbitMqSettings = rabbitMqSettings;  
60 try { 54 try {
61 connection = rabbitMqSettings.getConnectionFactory().newConnection(); 55 connection = rabbitMqSettings.getConnectionFactory().newConnection();
62 } catch (IOException | TimeoutException e) { 56 } catch (IOException | TimeoutException e) {
63 log.error("Failed to create connection.", e); 57 log.error("Failed to create connection.", e);
64 throw new RuntimeException("Failed to create connection.", e); 58 throw new RuntimeException("Failed to create connection.", e);
65 } 59 }
66 -  
67 try { 60 try {
68 channel = connection.createChannel(); 61 channel = connection.createChannel();
69 } catch (IOException e) { 62 } catch (IOException e) {
@@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue @@ -74,25 +67,42 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
74 } 67 }
75 68
76 @Override 69 @Override
77 - public String getTopic() {  
78 - return topic; 70 + protected List<GetResponse> doPoll(long durationInMillis) {
  71 + List<GetResponse> result = queues.stream()
  72 + .map(queue -> {
  73 + try {
  74 + return channel.basicGet(queue, false);
  75 + } catch (IOException e) {
  76 + log.error("Failed to get messages from queue: [{}]", queue);
  77 + throw new RuntimeException("Failed to get messages from queue.", e);
  78 + }
  79 + }).filter(Objects::nonNull).collect(Collectors.toList());
  80 + if (result.size() > 0) {
  81 + return result;
  82 + } else {
  83 + return Collections.emptyList();
  84 + }
79 } 85 }
80 86
81 @Override 87 @Override
82 - public void subscribe() {  
83 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
84 - subscribed = false; 88 + protected void doSubscribe(List<String> topicNames) {
  89 + queues = partitions.stream()
  90 + .map(TopicPartitionInfo::getFullTopicName)
  91 + .collect(Collectors.toSet());
  92 + queues.forEach(admin::createTopicIfNotExists);
85 } 93 }
86 94
87 @Override 95 @Override
88 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
89 - this.partitions = partitions;  
90 - subscribed = false; 96 + protected void doCommit() {
  97 + try {
  98 + channel.basicAck(0, true);
  99 + } catch (IOException e) {
  100 + log.error("Failed to ack messages.", e);
  101 + }
91 } 102 }
92 103
93 @Override 104 @Override
94 - public void unsubscribe() {  
95 - stopped = true; 105 + protected void doUnsubscribe() {
96 if (channel != null) { 106 if (channel != null) {
97 try { 107 try {
98 channel.close(); 108 channel.close();
@@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue @@ -109,63 +119,6 @@ public class TbRabbitMqConsumerTemplate<T extends TbQueueMsg> implements TbQueue
109 } 119 }
110 } 120 }
111 121
112 - @Override  
113 - public List<T> poll(long durationInMillis) {  
114 - if (!subscribed && partitions == null) {  
115 - try {  
116 - Thread.sleep(durationInMillis);  
117 - } catch (InterruptedException e) {  
118 - log.debug("Failed to await subscription", e);  
119 - }  
120 - } else {  
121 - if (!subscribed) {  
122 - queues = partitions.stream()  
123 - .map(TopicPartitionInfo::getFullTopicName)  
124 - .collect(Collectors.toSet());  
125 -  
126 - queues.forEach(admin::createTopicIfNotExists);  
127 - subscribed = true;  
128 - }  
129 -  
130 - List<T> result = queues.stream()  
131 - .map(queue -> {  
132 - try {  
133 - return channel.basicGet(queue, false);  
134 - } catch (IOException e) {  
135 - log.error("Failed to get messages from queue: [{}]", queue);  
136 - throw new RuntimeException("Failed to get messages from queue.", e);  
137 - }  
138 - }).filter(Objects::nonNull).map(message -> {  
139 - try {  
140 - return decode(message);  
141 - } catch (InvalidProtocolBufferException e) {  
142 - log.error("Failed to decode message: [{}].", message);  
143 - throw new RuntimeException("Failed to decode message.", e);  
144 - }  
145 - }).collect(Collectors.toList());  
146 - if (result.size() > 0) {  
147 - return result;  
148 - }  
149 - }  
150 - try {  
151 - Thread.sleep(durationInMillis);  
152 - } catch (InterruptedException e) {  
153 - if (!stopped) {  
154 - log.error("Failed to wait.", e);  
155 - }  
156 - }  
157 - return Collections.emptyList();  
158 - }  
159 -  
160 - @Override  
161 - public void commit() {  
162 - try {  
163 - channel.basicAck(0, true);  
164 - } catch (IOException e) {  
165 - log.error("Failed to ack messages.", e);  
166 - }  
167 - }  
168 -  
169 public T decode(GetResponse message) throws InvalidProtocolBufferException { 122 public T decode(GetResponse message) throws InvalidProtocolBufferException {
170 DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class); 123 DefaultTbQueueMsg msg = gson.fromJson(new String(message.getBody()), DefaultTbQueueMsg.class);
171 return decoder.decode(msg); 124 return decoder.decode(msg);
@@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer; @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer;
30 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 30 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
31 31
32 import java.io.IOException; 32 import java.io.IOException;
  33 +import java.util.Set;
  34 +import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executors; 35 import java.util.concurrent.Executors;
34 import java.util.concurrent.TimeoutException; 36 import java.util.concurrent.TimeoutException;
35 37
@@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue @@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
39 private final Gson gson = new Gson(); 41 private final Gson gson = new Gson();
40 private final TbQueueAdmin admin; 42 private final TbQueueAdmin admin;
41 private final TbRabbitMqSettings rabbitMqSettings; 43 private final TbRabbitMqSettings rabbitMqSettings;
42 - private ListeningExecutorService producerExecutor; 44 + private final ListeningExecutorService producerExecutor;
43 private final Channel channel; 45 private final Channel channel;
44 private final Connection connection; 46 private final Connection connection;
45 47
  48 + private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
  49 +
46 public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) { 50 public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
47 this.admin = admin; 51 this.admin = admin;
48 this.defaultTopic = defaultTopic; 52 this.defaultTopic = defaultTopic;
@@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue @@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
75 79
76 @Override 80 @Override
77 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) { 81 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  82 + createTopicIfNotExist(tpi);
78 AMQP.BasicProperties properties = new AMQP.BasicProperties(); 83 AMQP.BasicProperties properties = new AMQP.BasicProperties();
79 try { 84 try {
80 channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes()); 85 channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
@@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue @@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
110 } 115 }
111 } 116 }
112 117
  118 + private void createTopicIfNotExist(TopicPartitionInfo tpi) {
  119 + if (topics.contains(tpi)) {
  120 + return;
  121 + }
  122 + admin.createTopicIfNotExists(tpi.getFullTopicName());
  123 + topics.add(tpi);
  124 + }
113 } 125 }
@@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message; @@ -25,21 +25,17 @@ import com.amazonaws.services.sqs.model.Message;
25 import com.amazonaws.services.sqs.model.ReceiveMessageRequest; 25 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
26 import com.google.common.util.concurrent.Futures; 26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture; 27 import com.google.common.util.concurrent.ListenableFuture;
28 -import com.google.common.util.concurrent.ListeningExecutorService;  
29 -import com.google.common.util.concurrent.MoreExecutors;  
30 import com.google.gson.Gson; 28 import com.google.gson.Gson;
31 import com.google.protobuf.InvalidProtocolBufferException; 29 import com.google.protobuf.InvalidProtocolBufferException;
32 import lombok.Data; 30 import lombok.Data;
33 import lombok.extern.slf4j.Slf4j; 31 import lombok.extern.slf4j.Slf4j;
34 import org.springframework.util.CollectionUtils; 32 import org.springframework.util.CollectionUtils;
35 -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;  
36 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
37 -import org.thingsboard.server.queue.TbQueueConsumer;  
38 import org.thingsboard.server.queue.TbQueueMsg; 34 import org.thingsboard.server.queue.TbQueueMsg;
39 import org.thingsboard.server.queue.TbQueueMsgDecoder; 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
  36 +import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
40 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
41 38
42 -import java.io.IOException;  
43 import java.util.ArrayList; 39 import java.util.ArrayList;
44 import java.util.Collections; 40 import java.util.Collections;
45 import java.util.List; 41 import java.util.List;
@@ -47,34 +43,28 @@ import java.util.Objects; @@ -47,34 +43,28 @@ import java.util.Objects;
47 import java.util.Set; 43 import java.util.Set;
48 import java.util.concurrent.CopyOnWriteArrayList; 44 import java.util.concurrent.CopyOnWriteArrayList;
49 import java.util.concurrent.ExecutionException; 45 import java.util.concurrent.ExecutionException;
50 -import java.util.concurrent.Executors;  
51 import java.util.concurrent.TimeUnit; 46 import java.util.concurrent.TimeUnit;
52 import java.util.stream.Collectors; 47 import java.util.stream.Collectors;
53 import java.util.stream.Stream; 48 import java.util.stream.Stream;
54 49
55 @Slf4j 50 @Slf4j
56 -public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueConsumer<T> { 51 +public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractParallelTbQueueConsumerTemplate<Message, T> {
57 52
58 private static final int MAX_NUM_MSGS = 10; 53 private static final int MAX_NUM_MSGS = 10;
59 54
60 private final Gson gson = new Gson(); 55 private final Gson gson = new Gson();
61 private final TbQueueAdmin admin; 56 private final TbQueueAdmin admin;
62 private final AmazonSQS sqsClient; 57 private final AmazonSQS sqsClient;
63 - private final String topic;  
64 private final TbQueueMsgDecoder<T> decoder; 58 private final TbQueueMsgDecoder<T> decoder;
65 private final TbAwsSqsSettings sqsSettings; 59 private final TbAwsSqsSettings sqsSettings;
66 60
67 private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>(); 61 private final List<AwsSqsMsgWrapper> pendingMessages = new CopyOnWriteArrayList<>();
68 private volatile Set<String> queueUrls; 62 private volatile Set<String> queueUrls;
69 - private volatile Set<TopicPartitionInfo> partitions;  
70 - private ListeningExecutorService consumerExecutor;  
71 - private volatile boolean subscribed;  
72 - private volatile boolean stopped = false;  
73 63
74 public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) { 64 public TbAwsSqsConsumerTemplate(TbQueueAdmin admin, TbAwsSqsSettings sqsSettings, String topic, TbQueueMsgDecoder<T> decoder) {
  65 + super(topic);
75 this.admin = admin; 66 this.admin = admin;
76 this.decoder = decoder; 67 this.decoder = decoder;
77 - this.topic = topic;  
78 this.sqsSettings = sqsSettings; 68 this.sqsSettings = sqsSettings;
79 69
80 AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); 70 AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
@@ -87,81 +77,60 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -87,81 +77,60 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
87 } 77 }
88 78
89 @Override 79 @Override
90 - public String getTopic() {  
91 - return topic; 80 + protected void doSubscribe(List<String> topicNames) {
  81 + queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());
  82 + initNewExecutor(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1);
92 } 83 }
93 84
94 @Override 85 @Override
95 - public void subscribe() {  
96 - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true));  
97 - subscribed = false; 86 + protected List<Message> doPoll(long durationInMillis) {
  87 + int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
  88 + List<ListenableFuture<List<Message>>> futureList = queueUrls
  89 + .stream()
  90 + .map(url -> poll(url, duration))
  91 + .collect(Collectors.toList());
  92 + ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);
  93 + try {
  94 + return futureResult.get().stream()
  95 + .flatMap(List::stream)
  96 + .filter(Objects::nonNull)
  97 + .collect(Collectors.toList());
  98 + } catch (InterruptedException | ExecutionException e) {
  99 + if (stopped) {
  100 + log.info("[{}] Aws SQS consumer is stopped.", getTopic());
  101 + } else {
  102 + log.error("Failed to pool messages.", e);
  103 + }
  104 + return Collections.emptyList();
  105 + }
98 } 106 }
99 107
100 @Override 108 @Override
101 - public void subscribe(Set<TopicPartitionInfo> partitions) {  
102 - this.partitions = partitions;  
103 - subscribed = false; 109 + public T decode(Message message) throws InvalidProtocolBufferException {
  110 + DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);
  111 + return decoder.decode(msg);
104 } 112 }
105 113
106 @Override 114 @Override
107 - public void unsubscribe() {  
108 - stopped = true;  
109 -  
110 - if (sqsClient != null) {  
111 - sqsClient.shutdown();  
112 - }  
113 - if (consumerExecutor != null) {  
114 - consumerExecutor.shutdownNow();  
115 - } 115 + protected void doCommit() {
  116 + pendingMessages.forEach(msg ->
  117 + consumerExecutor.submit(() -> {
  118 + List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()
  119 + .stream()
  120 + .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))
  121 + .collect(Collectors.toList());
  122 + sqsClient.deleteMessageBatch(msg.getUrl(), entries);
  123 + }));
  124 + pendingMessages.clear();
116 } 125 }
117 126
118 @Override 127 @Override
119 - public List<T> poll(long durationInMillis) {  
120 - if (!subscribed && partitions == null) {  
121 - try {  
122 - Thread.sleep(durationInMillis);  
123 - } catch (InterruptedException e) {  
124 - log.debug("Failed to await subscription", e);  
125 - }  
126 - } else {  
127 - if (!subscribed) {  
128 - List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());  
129 - queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet());  
130 - consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1));  
131 - subscribed = true;  
132 - }  
133 -  
134 - if (!pendingMessages.isEmpty()) {  
135 - log.warn("Present {} non committed messages.", pendingMessages.size());  
136 - return Collections.emptyList();  
137 - }  
138 -  
139 - List<ListenableFuture<List<Message>>> futureList = queueUrls  
140 - .stream()  
141 - .map(url -> poll(url, (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis)))  
142 - .collect(Collectors.toList());  
143 - ListenableFuture<List<List<Message>>> futureResult = Futures.allAsList(futureList);  
144 - try {  
145 - return futureResult.get().stream()  
146 - .flatMap(List::stream)  
147 - .map(msg -> {  
148 - try {  
149 - return decode(msg);  
150 - } catch (IOException e) {  
151 - log.error("Failed to decode message: [{}]", msg);  
152 - return null;  
153 - }  
154 - }).filter(Objects::nonNull)  
155 - .collect(Collectors.toList());  
156 - } catch (InterruptedException | ExecutionException e) {  
157 - if (stopped) {  
158 - log.info("[{}] Aws SQS consumer is stopped.", topic);  
159 - } else {  
160 - log.error("Failed to pool messages.", e);  
161 - }  
162 - } 128 + protected void doUnsubscribe() {
  129 + stopped = true;
  130 + if (sqsClient != null) {
  131 + sqsClient.shutdown();
163 } 132 }
164 - return Collections.emptyList(); 133 + shutdownExecutor();
165 } 134 }
166 135
167 private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) { 136 private ListenableFuture<List<Message>> poll(String url, int waitTimeSeconds) {
@@ -172,7 +141,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -172,7 +141,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
172 ReceiveMessageRequest request = new ReceiveMessageRequest(); 141 ReceiveMessageRequest request = new ReceiveMessageRequest();
173 request 142 request
174 .withWaitTimeSeconds(waitTimeSeconds) 143 .withWaitTimeSeconds(waitTimeSeconds)
175 - .withMessageAttributeNames("headers")  
176 .withQueueUrl(url) 144 .withQueueUrl(url)
177 .withMaxNumberOfMessages(MAX_NUM_MSGS); 145 .withMaxNumberOfMessages(MAX_NUM_MSGS);
178 return sqsClient.receiveMessage(request).getMessages(); 146 return sqsClient.receiveMessage(request).getMessages();
@@ -194,25 +162,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo @@ -194,25 +162,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo
194 }, consumerExecutor); 162 }, consumerExecutor);
195 } 163 }
196 164
197 - @Override  
198 - public void commit() {  
199 - pendingMessages.forEach(msg ->  
200 - consumerExecutor.submit(() -> {  
201 - List<DeleteMessageBatchRequestEntry> entries = msg.getMessages()  
202 - .stream()  
203 - .map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle()))  
204 - .collect(Collectors.toList());  
205 - sqsClient.deleteMessageBatch(msg.getUrl(), entries);  
206 - }));  
207 -  
208 - pendingMessages.clear();  
209 - }  
210 -  
211 - public T decode(Message message) throws InvalidProtocolBufferException {  
212 - DefaultTbQueueMsg msg = gson.fromJson(message.getBody(), DefaultTbQueueMsg.class);  
213 - return decoder.decode(msg);  
214 - }  
215 -  
216 @Data 165 @Data
217 private static class AwsSqsMsgWrapper { 166 private static class AwsSqsMsgWrapper {
218 private final String url; 167 private final String url;
@@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer; @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg; 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
38 38
39 import java.util.Map; 39 import java.util.Map;
  40 +import java.util.UUID;
40 import java.util.concurrent.ConcurrentHashMap; 41 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.Executors; 42 import java.util.concurrent.Executors;
42 43
@@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr @@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
80 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); 81 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
81 sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); 82 sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg)));
82 83
83 - sendMsgRequest.withMessageGroupId(msg.getKey().toString()); 84 + sendMsgRequest.withMessageGroupId(tpi.getTopic());
  85 + sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString());
  86 +
84 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); 87 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
85 88
86 Futures.addCallback(future, new FutureCallback<SendMessageResult>() { 89 Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
@@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes { @@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes {
55 @PostConstruct 55 @PostConstruct
56 private void init() { 56 private void init() {
57 defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); 57 defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true");
58 - defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true");  
59 58
60 coreAttributes = getConfigs(coreProperties); 59 coreAttributes = getConfigs(coreProperties);
61 ruleEngineAttributes = getConfigs(ruleEngineProperties); 60 ruleEngineAttributes = getConfigs(ruleEngineProperties);
1 - 1 +TB_QUEUE_TYPE=kafka
2 REMOTE_JS_EVAL_REQUEST_TOPIC=js_eval.requests 2 REMOTE_JS_EVAL_REQUEST_TOPIC=js_eval.requests
3 TB_KAFKA_SERVERS=kafka:9092 3 TB_KAFKA_SERVERS=kafka:9092
4 LOGGER_LEVEL=info 4 LOGGER_LEVEL=info
@@ -14,7 +14,7 @@ @@ -14,7 +14,7 @@
14 # limitations under the License. 14 # limitations under the License.
15 # 15 #
16 16
17 -service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) 17 +queue_type: "TB_QUEUE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
18 request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" 18 request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
19 19
20 js: 20 js:
@@ -25,18 +25,18 @@ kafka: @@ -25,18 +25,18 @@ kafka:
25 # Kafka Bootstrap Servers 25 # Kafka Bootstrap Servers
26 servers: "TB_KAFKA_SERVERS" 26 servers: "TB_KAFKA_SERVERS"
27 replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR" 27 replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR"
28 - topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES" 28 + topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
29 29
30 pubsub: 30 pubsub:
31 project_id: "TB_QUEUE_PUBSUB_PROJECT_ID" 31 project_id: "TB_QUEUE_PUBSUB_PROJECT_ID"
32 service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT" 32 service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT"
33 - queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES" 33 + queue_properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
34 34
35 aws_sqs: 35 aws_sqs:
36 access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID" 36 access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID"
37 secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY" 37 secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY"
38 region: "TB_QUEUE_AWS_SQS_REGION" 38 region: "TB_QUEUE_AWS_SQS_REGION"
39 - queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES" 39 + queue_properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
40 40
41 rabbitmq: 41 rabbitmq:
42 host: "TB_QUEUE_RABBIT_MQ_HOST" 42 host: "TB_QUEUE_RABBIT_MQ_HOST"
@@ -44,14 +44,14 @@ rabbitmq: @@ -44,14 +44,14 @@ rabbitmq:
44 virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST" 44 virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST"
45 username: "TB_QUEUE_RABBIT_MQ_USERNAME" 45 username: "TB_QUEUE_RABBIT_MQ_USERNAME"
46 password: "TB_QUEUE_RABBIT_MQ_PASSWORD" 46 password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
47 - queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES" 47 + queue_properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
48 48
49 service_bus: 49 service_bus:
50 namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME" 50 namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME"
51 sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME" 51 sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME"
52 sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY" 52 sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY"
53 max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES" 53 max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES"
54 - queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES" 54 + queue_properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
55 55
56 logger: 56 logger:
57 level: "LOGGER_LEVEL" 57 level: "LOGGER_LEVEL"
@@ -14,7 +14,7 @@ @@ -14,7 +14,7 @@
14 # limitations under the License. 14 # limitations under the License.
15 # 15 #
16 16
17 -service-type: "kafka" 17 +queue_type: "kafka"
18 request_topic: "js_eval.requests" 18 request_topic: "js_eval.requests"
19 19
20 js: 20 js:
@@ -25,13 +25,13 @@ kafka: @@ -25,13 +25,13 @@ kafka:
25 # Kafka Bootstrap Servers 25 # Kafka Bootstrap Servers
26 servers: "localhost:9092" 26 servers: "localhost:9092"
27 replication_factor: "1" 27 replication_factor: "1"
28 - topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600" 28 + topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
29 29
30 pubsub: 30 pubsub:
31 - queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800" 31 + queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
32 32
33 aws_sqs: 33 aws_sqs:
34 - queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800" 34 + queue_properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
35 35
36 rabbitmq: 36 rabbitmq:
37 host: "localhost" 37 host: "localhost"
@@ -39,10 +39,10 @@ rabbitmq: @@ -39,10 +39,10 @@ rabbitmq:
39 virtual_host: "/" 39 virtual_host: "/"
40 username: "admin" 40 username: "admin"
41 password: "password" 41 password: "password"
42 - queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000" 42 + queue_properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
43 43
44 service_bus: 44 service_bus:
45 - queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800" 45 + queue_properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
46 46
47 logger: 47 logger:
48 level: "info" 48 level: "info"
@@ -22,6 +22,7 @@ @@ -22,6 +22,7 @@
22 "azure-sb": "^0.11.1", 22 "azure-sb": "^0.11.1",
23 "long": "^4.0.0", 23 "long": "^4.0.0",
24 "uuid-parse": "^1.0.0", 24 "uuid-parse": "^1.0.0",
  25 + "uuid-random": "^1.3.0",
25 "winston": "^3.0.0", 26 "winston": "^3.0.0",
26 "winston-daily-rotate-file": "^3.2.1" 27 "winston-daily-rotate-file": "^3.2.1"
27 }, 28 },
@@ -19,6 +19,7 @@ @@ -19,6 +19,7 @@
19 const config = require('config'), 19 const config = require('config'),
20 JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), 20 JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
21 logger = require('../config/logger')._logger('awsSqsTemplate'); 21 logger = require('../config/logger')._logger('awsSqsTemplate');
  22 +const uuid = require('uuid-random');
22 23
23 const requestTopic = config.get('request_topic'); 24 const requestTopic = config.get('request_topic');
24 25
@@ -26,10 +27,10 @@ const accessKeyId = config.get('aws_sqs.access_key_id'); @@ -26,10 +27,10 @@ const accessKeyId = config.get('aws_sqs.access_key_id');
26 const secretAccessKey = config.get('aws_sqs.secret_access_key'); 27 const secretAccessKey = config.get('aws_sqs.secret_access_key');
27 const region = config.get('aws_sqs.region'); 28 const region = config.get('aws_sqs.region');
28 const AWS = require('aws-sdk'); 29 const AWS = require('aws-sdk');
29 -const queueProperties = config.get('aws_sqs.queue-properties');  
30 -const poolInterval = config.get('js.response_poll_interval'); 30 +const queueProperties = config.get('aws_sqs.queue_properties');
  31 +const pollInterval = config.get('js.response_poll_interval');
31 32
32 -let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; 33 +let queueAttributes = {FifoQueue: 'true'};
33 let sqsClient; 34 let sqsClient;
34 let requestQueueURL; 35 let requestQueueURL;
35 const queueUrls = new Map(); 36 const queueUrls = new Map();
@@ -51,7 +52,12 @@ function AwsSqsProducer() { @@ -51,7 +52,12 @@ function AwsSqsProducer() {
51 queueUrls.set(responseTopic, responseQueueUrl); 52 queueUrls.set(responseTopic, responseQueueUrl);
52 } 53 }
53 54
54 - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; 55 + let params = {
  56 + MessageBody: msgBody,
  57 + QueueUrl: responseQueueUrl,
  58 + MessageGroupId: 'js_eval',
  59 + MessageDeduplicationId: uuid()
  60 + };
55 61
56 return new Promise((resolve, reject) => { 62 return new Promise((resolve, reject) => {
57 sqsClient.sendMessage(params, function (err, data) { 63 sqsClient.sendMessage(params, function (err, data) {
@@ -74,11 +80,13 @@ function AwsSqsProducer() { @@ -74,11 +80,13 @@ function AwsSqsProducer() {
74 80
75 const queues = await getQueues(); 81 const queues = await getQueues();
76 82
77 - queues.forEach(queueUrl => {  
78 - const delimiterPosition = queueUrl.lastIndexOf('/');  
79 - const queueName = queueUrl.substring(delimiterPosition + 1);  
80 - queueUrls.set(queueName, queueUrl);  
81 - }) 83 + if (queues) {
  84 + queues.forEach(queueUrl => {
  85 + const delimiterPosition = queueUrl.lastIndexOf('/');
  86 + const queueName = queueUrl.substring(delimiterPosition + 1);
  87 + queueUrls.set(queueName, queueUrl);
  88 + });
  89 + }
82 90
83 parseQueueProperties(); 91 parseQueueProperties();
84 92
@@ -95,6 +103,7 @@ function AwsSqsProducer() { @@ -95,6 +103,7 @@ function AwsSqsProducer() {
95 WaitTimeSeconds: poolInterval / 1000 103 WaitTimeSeconds: poolInterval / 1000
96 }; 104 };
97 while (!stopped) { 105 while (!stopped) {
  106 + let pollStartTs = new Date().getTime();
98 const messages = await new Promise((resolve, reject) => { 107 const messages = await new Promise((resolve, reject) => {
99 sqsClient.receiveMessage(params, function (err, data) { 108 sqsClient.receiveMessage(params, function (err, data) {
100 if (err) { 109 if (err) {
@@ -127,6 +136,11 @@ function AwsSqsProducer() { @@ -127,6 +136,11 @@ function AwsSqsProducer() {
127 //do nothing 136 //do nothing
128 } 137 }
129 }); 138 });
  139 + } else {
  140 + let pollDuration = new Date().getTime() - pollStartTs;
  141 + if (pollDuration < pollInterval) {
  142 + await sleep(pollInterval - pollDuration);
  143 + }
130 } 144 }
131 } 145 }
132 } catch (e) { 146 } catch (e) {
@@ -175,6 +189,12 @@ function parseQueueProperties() { @@ -175,6 +189,12 @@ function parseQueueProperties() {
175 }); 189 });
176 } 190 }
177 191
  192 +function sleep(ms) {
  193 + return new Promise((resolve) => {
  194 + setTimeout(resolve, ms);
  195 + });
  196 +}
  197 +
178 process.on('exit', () => { 198 process.on('exit', () => {
179 stopped = true; 199 stopped = true;
180 logger.info('Aws Sqs client stopped.'); 200 logger.info('Aws Sqs client stopped.');
@@ -20,7 +20,7 @@ const config = require('config'), @@ -20,7 +20,7 @@ const config = require('config'),
20 logger = require('../config/logger')._logger('kafkaTemplate'), 20 logger = require('../config/logger')._logger('kafkaTemplate'),
21 KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; 21 KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
22 const replicationFactor = config.get('kafka.replication_factor'); 22 const replicationFactor = config.get('kafka.replication_factor');
23 -const topicProperties = config.get('kafka.topic-properties'); 23 +const topicProperties = config.get('kafka.topic_properties');
24 24
25 let kafkaClient; 25 let kafkaClient;
26 let kafkaAdmin; 26 let kafkaAdmin;
@@ -24,7 +24,7 @@ const {PubSub} = require('@google-cloud/pubsub'); @@ -24,7 +24,7 @@ const {PubSub} = require('@google-cloud/pubsub');
24 const projectId = config.get('pubsub.project_id'); 24 const projectId = config.get('pubsub.project_id');
25 const credentials = JSON.parse(config.get('pubsub.service_account')); 25 const credentials = JSON.parse(config.get('pubsub.service_account'));
26 const requestTopic = config.get('request_topic'); 26 const requestTopic = config.get('request_topic');
27 -const queueProperties = config.get('pubsub.queue-properties'); 27 +const queueProperties = config.get('pubsub.queue_properties');
28 28
29 let pubSubClient; 29 let pubSubClient;
30 30
@@ -98,23 +98,32 @@ function PubSubProducer() { @@ -98,23 +98,32 @@ function PubSubProducer() {
98 98
99 async function createTopic(topic) { 99 async function createTopic(topic) {
100 if (!topics.includes(topic)) { 100 if (!topics.includes(topic)) {
101 - await pubSubClient.createTopic(topic); 101 + try {
  102 + await pubSubClient.createTopic(topic);
  103 + logger.info('Created new Pub/Sub topic: %s', topic);
  104 + } catch (e) {
  105 + logger.info('Pub/Sub topic already exists');
  106 + }
102 topics.push(topic); 107 topics.push(topic);
103 - logger.info('Created new Pub/Sub topic: %s', topic);  
104 } 108 }
105 await createSubscription(topic) 109 await createSubscription(topic)
106 } 110 }
107 111
108 async function createSubscription(topic) { 112 async function createSubscription(topic) {
109 if (!subscriptions.includes(topic)) { 113 if (!subscriptions.includes(topic)) {
110 - await pubSubClient.createSubscription(topic, topic, {  
111 - topic: topic,  
112 - subscription: topic,  
113 - ackDeadlineSeconds: queueProps['ackDeadlineInSec'],  
114 - messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}  
115 - }); 114 + try {
  115 + await pubSubClient.createSubscription(topic, topic, {
  116 + topic: topic,
  117 + subscription: topic,
  118 + ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
  119 + messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
  120 + });
  121 + logger.info('Created new Pub/Sub subscription: %s', topic);
  122 + } catch (e) {
  123 + logger.info('Pub/Sub subscription already exists.');
  124 + }
  125 +
116 subscriptions.push(topic); 126 subscriptions.push(topic);
117 - logger.info('Created new Pub/Sub subscription: %s', topic);  
118 } 127 }
119 } 128 }
120 129
@@ -26,23 +26,23 @@ const port = config.get('rabbitmq.port'); @@ -26,23 +26,23 @@ const port = config.get('rabbitmq.port');
26 const vhost = config.get('rabbitmq.virtual_host'); 26 const vhost = config.get('rabbitmq.virtual_host');
27 const username = config.get('rabbitmq.username'); 27 const username = config.get('rabbitmq.username');
28 const password = config.get('rabbitmq.password'); 28 const password = config.get('rabbitmq.password');
29 -const queueProperties = config.get('rabbitmq.queue-properties');  
30 -const poolInterval = config.get('js.response_poll_interval'); 29 +const queueProperties = config.get('rabbitmq.queue_properties');
  30 +const pollInterval = config.get('js.response_poll_interval');
31 31
32 const amqp = require('amqplib/callback_api'); 32 const amqp = require('amqplib/callback_api');
33 33
34 -let queueParams = {durable: false, exclusive: false, autoDelete: false}; 34 +let queueOptions = {durable: false, exclusive: false, autoDelete: false};
35 let connection; 35 let connection;
36 let channel; 36 let channel;
37 let stopped = false; 37 let stopped = false;
38 -const responseTopics = []; 38 +let queues = [];
39 39
40 function RabbitMqProducer() { 40 function RabbitMqProducer() {
41 this.send = async (responseTopic, scriptId, rawResponse, headers) => { 41 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
42 42
43 - if (!responseTopics.includes(responseTopic)) { 43 + if (!queues.includes(responseTopic)) {
44 await createQueue(responseTopic); 44 await createQueue(responseTopic);
45 - responseTopics.push(responseTopic); 45 + queues.push(responseTopic);
46 } 46 }
47 47
48 let data = JSON.stringify( 48 let data = JSON.stringify(
@@ -98,6 +98,7 @@ function RabbitMqProducer() { @@ -98,6 +98,7 @@ function RabbitMqProducer() {
98 const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); 98 const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
99 99
100 while (!stopped) { 100 while (!stopped) {
  101 + let pollStartTs = new Date().getTime();
101 let message = await new Promise((resolve, reject) => { 102 let message = await new Promise((resolve, reject) => {
102 channel.get(requestTopic, {}, function (err, msg) { 103 channel.get(requestTopic, {}, function (err, msg) {
103 if (err) { 104 if (err) {
@@ -112,7 +113,10 @@ function RabbitMqProducer() { @@ -112,7 +113,10 @@ function RabbitMqProducer() {
112 messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); 113 messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
113 channel.ack(message); 114 channel.ack(message);
114 } else { 115 } else {
115 - await sleep(poolInterval); 116 + let pollDuration = new Date().getTime() - pollStartTs;
  117 + if (pollDuration < pollInterval) {
  118 + await sleep(pollInterval - pollDuration);
  119 + }
116 } 120 }
117 } 121 }
118 } catch (e) { 122 } catch (e) {
@@ -123,16 +127,18 @@ function RabbitMqProducer() { @@ -123,16 +127,18 @@ function RabbitMqProducer() {
123 })(); 127 })();
124 128
125 function parseQueueProperties() { 129 function parseQueueProperties() {
  130 + let args = {};
126 const props = queueProperties.split(';'); 131 const props = queueProperties.split(';');
127 props.forEach(p => { 132 props.forEach(p => {
128 const delimiterPosition = p.indexOf(':'); 133 const delimiterPosition = p.indexOf(':');
129 - queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); 134 + args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1);
130 }); 135 });
  136 + queueOptions['arguments'] = args;
131 } 137 }
132 138
133 -function createQueue(topic) { 139 +async function createQueue(topic) {
134 return new Promise((resolve, reject) => { 140 return new Promise((resolve, reject) => {
135 - channel.assertQueue(topic, queueParams, function (err) { 141 + channel.assertQueue(topic, queueOptions, function (err) {
136 if (err) { 142 if (err) {
137 reject(err); 143 reject(err);
138 } else { 144 } else {
@@ -26,7 +26,7 @@ const requestTopic = config.get('request_topic'); @@ -26,7 +26,7 @@ const requestTopic = config.get('request_topic');
26 const namespaceName = config.get('service_bus.namespace_name'); 26 const namespaceName = config.get('service_bus.namespace_name');
27 const sasKeyName = config.get('service_bus.sas_key_name'); 27 const sasKeyName = config.get('service_bus.sas_key_name');
28 const sasKey = config.get('service_bus.sas_key'); 28 const sasKey = config.get('service_bus.sas_key');
29 -const queueProperties = config.get('service_bus.queue-properties'); 29 +const queueProperties = config.get('service_bus.queue_properties');
30 30
31 let sbClient; 31 let sbClient;
32 let receiverClient; 32 let receiverClient;
@@ -140,6 +140,7 @@ function parseQueueProperties() { @@ -140,6 +140,7 @@ function parseQueueProperties() {
140 properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); 140 properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
141 }); 141 });
142 queueOptions = { 142 queueOptions = {
  143 + DuplicateDetection: 'false',
143 MaxSizeInMegabytes: properties['maxSizeInMb'], 144 MaxSizeInMegabytes: properties['maxSizeInMb'],
144 DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, 145 DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
145 LockDuration: `PT${properties['lockDurationInSec']}S` 146 LockDuration: `PT${properties['lockDurationInSec']}S`
@@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
16 16
17 const config = require('config'), logger = require('./config/logger')._logger('main'); 17 const config = require('config'), logger = require('./config/logger')._logger('main');
18 18
19 -const serviceType = config.get('service-type'); 19 +const serviceType = config.get('queue_type');
20 switch (serviceType) { 20 switch (serviceType) {
21 case 'kafka': 21 case 'kafka':
22 logger.info('Starting kafka template.'); 22 logger.info('Starting kafka template.');
@@ -96,7 +96,7 @@ @@ -96,7 +96,7 @@
96 <snakeyaml.version>1.25</snakeyaml.version> 96 <snakeyaml.version>1.25</snakeyaml.version>
97 <struts.version>1.3.10</struts.version> 97 <struts.version>1.3.10</struts.version>
98 <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version> 98 <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
99 - <pubsub.client.version>1.84.0</pubsub.client.version> 99 + <pubsub.client.version>1.105.0</pubsub.client.version>
100 <azure-servicebus.version>3.2.0</azure-servicebus.version> 100 <azure-servicebus.version>3.2.0</azure-servicebus.version>
101 <passay.version>1.5.0</passay.version> 101 <passay.version>1.5.0</passay.version>
102 <ua-parser.version>1.4.3</ua-parser.version> 102 <ua-parser.version>1.4.3</ua-parser.version>