Showing
6 changed files
with
18 additions
and
8 deletions
@@ -127,6 +127,11 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | @@ -127,6 +127,11 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | ||
127 | if (!subscribed) { | 127 | if (!subscribed) { |
128 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); | 128 | List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); |
129 | queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); | 129 | queueUrls = topicNames.stream().map(this::getQueueUrl).collect(Collectors.toSet()); |
130 | + | ||
131 | + if (consumerExecutor != null) { | ||
132 | + consumerExecutor.shutdown(); | ||
133 | + } | ||
134 | + | ||
130 | consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); | 135 | consumerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(queueUrls.size() * sqsSettings.getThreadsPerTopic() + 1)); |
131 | subscribed = true; | 136 | subscribed = true; |
132 | } | 137 | } |
@@ -172,7 +177,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | @@ -172,7 +177,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> implements TbQueueCo | ||
172 | ReceiveMessageRequest request = new ReceiveMessageRequest(); | 177 | ReceiveMessageRequest request = new ReceiveMessageRequest(); |
173 | request | 178 | request |
174 | .withWaitTimeSeconds(waitTimeSeconds) | 179 | .withWaitTimeSeconds(waitTimeSeconds) |
175 | - .withMessageAttributeNames("headers") | ||
176 | .withQueueUrl(url) | 180 | .withQueueUrl(url) |
177 | .withMaxNumberOfMessages(MAX_NUM_MSGS); | 181 | .withMaxNumberOfMessages(MAX_NUM_MSGS); |
178 | return sqsClient.receiveMessage(request).getMessages(); | 182 | return sqsClient.receiveMessage(request).getMessages(); |
@@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer; | @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer; | ||
37 | import org.thingsboard.server.queue.common.DefaultTbQueueMsg; | 37 | import org.thingsboard.server.queue.common.DefaultTbQueueMsg; |
38 | 38 | ||
39 | import java.util.Map; | 39 | import java.util.Map; |
40 | +import java.util.UUID; | ||
40 | import java.util.concurrent.ConcurrentHashMap; | 41 | import java.util.concurrent.ConcurrentHashMap; |
41 | import java.util.concurrent.Executors; | 42 | import java.util.concurrent.Executors; |
42 | 43 | ||
@@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | @@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr | ||
80 | sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); | 81 | sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); |
81 | sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); | 82 | sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); |
82 | 83 | ||
83 | - sendMsgRequest.withMessageGroupId(msg.getKey().toString()); | 84 | + sendMsgRequest.withMessageGroupId(tpi.getTopic()); |
85 | + sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString()); | ||
86 | + | ||
84 | ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); | 87 | ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); |
85 | 88 | ||
86 | Futures.addCallback(future, new FutureCallback<SendMessageResult>() { | 89 | Futures.addCallback(future, new FutureCallback<SendMessageResult>() { |
@@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes { | @@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes { | ||
55 | @PostConstruct | 55 | @PostConstruct |
56 | private void init() { | 56 | private void init() { |
57 | defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); | 57 | defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); |
58 | - defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); | ||
59 | 58 | ||
60 | coreAttributes = getConfigs(coreProperties); | 59 | coreAttributes = getConfigs(coreProperties); |
61 | ruleEngineAttributes = getConfigs(ruleEngineProperties); | 60 | ruleEngineAttributes = getConfigs(ruleEngineProperties); |
@@ -22,6 +22,7 @@ | @@ -22,6 +22,7 @@ | ||
22 | "azure-sb": "^0.11.1", | 22 | "azure-sb": "^0.11.1", |
23 | "long": "^4.0.0", | 23 | "long": "^4.0.0", |
24 | "uuid-parse": "^1.0.0", | 24 | "uuid-parse": "^1.0.0", |
25 | + "uuid-random": "^1.3.0", | ||
25 | "winston": "^3.0.0", | 26 | "winston": "^3.0.0", |
26 | "winston-daily-rotate-file": "^3.2.1" | 27 | "winston-daily-rotate-file": "^3.2.1" |
27 | }, | 28 | }, |
@@ -19,6 +19,7 @@ | @@ -19,6 +19,7 @@ | ||
19 | const config = require('config'), | 19 | const config = require('config'), |
20 | JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), | 20 | JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
21 | logger = require('../config/logger')._logger('awsSqsTemplate'); | 21 | logger = require('../config/logger')._logger('awsSqsTemplate'); |
22 | +const uuid = require('uuid-random'); | ||
22 | 23 | ||
23 | const requestTopic = config.get('request_topic'); | 24 | const requestTopic = config.get('request_topic'); |
24 | 25 | ||
@@ -29,7 +30,7 @@ const AWS = require('aws-sdk'); | @@ -29,7 +30,7 @@ const AWS = require('aws-sdk'); | ||
29 | const queueProperties = config.get('aws_sqs.queue_properties'); | 30 | const queueProperties = config.get('aws_sqs.queue_properties'); |
30 | const poolInterval = config.get('js.response_poll_interval'); | 31 | const poolInterval = config.get('js.response_poll_interval'); |
31 | 32 | ||
32 | -let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'}; | 33 | +let queueAttributes = {FifoQueue: 'true'}; |
33 | let sqsClient; | 34 | let sqsClient; |
34 | let requestQueueURL; | 35 | let requestQueueURL; |
35 | const queueUrls = new Map(); | 36 | const queueUrls = new Map(); |
@@ -51,7 +52,7 @@ function AwsSqsProducer() { | @@ -51,7 +52,7 @@ function AwsSqsProducer() { | ||
51 | queueUrls.set(responseTopic, responseQueueUrl); | 52 | queueUrls.set(responseTopic, responseQueueUrl); |
52 | } | 53 | } |
53 | 54 | ||
54 | - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId}; | 55 | + let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: 'js_eval', MessageDeduplicationId: uuid()}; |
55 | 56 | ||
56 | return new Promise((resolve, reject) => { | 57 | return new Promise((resolve, reject) => { |
57 | sqsClient.sendMessage(params, function (err, data) { | 58 | sqsClient.sendMessage(params, function (err, data) { |
@@ -60,9 +60,11 @@ function PubSubProducer() { | @@ -60,9 +60,11 @@ function PubSubProducer() { | ||
60 | const topicList = await pubSubClient.getTopics(); | 60 | const topicList = await pubSubClient.getTopics(); |
61 | 61 | ||
62 | if (topicList) { | 62 | if (topicList) { |
63 | - topicList[0].forEach(topic => { | ||
64 | - topics.push(getName(topic.name)); | ||
65 | - }); | 63 | + if (topicList) { |
64 | + topicList[0].forEach(topic => { | ||
65 | + topics.push(getName(topic.name)); | ||
66 | + }); | ||
67 | + } | ||
66 | } | 68 | } |
67 | 69 | ||
68 | const subscriptionList = await pubSubClient.getSubscriptions(); | 70 | const subscriptionList = await pubSubClient.getSubscriptions(); |