Commit ce01e3294921794b8939b41faacf63a326a39f0a

Authored by Andrew Shvayka
Committed by GitHub
2 parents 8d5c38b7 00e32ee8

Merge pull request #2716 from YevhenBondarenko/develop/2.5-js-executor

[2.5] js executor
Showing 23 changed files with 134 additions and 83 deletions
... ... @@ -67,6 +67,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
67 67
68 68 try {
69 69 QueueDescription queueDescription = new QueueDescription(topic);
  70 + queueDescription.setRequiresDuplicateDetection(false);
70 71 setQueueConfigs(queueDescription);
71 72
72 73 client.createQueue(queueDescription);
... ...
... ... @@ -58,7 +58,7 @@ public class TbServiceBusConsumerTemplate<T extends TbQueueMsg> extends Abstract
58 58 private final Gson gson = new Gson();
59 59
60 60 private Set<CoreMessageReceiver> receivers;
61   - private Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
  61 + private final Map<CoreMessageReceiver, Collection<MessageWithDeliveryTag>> pendingMessages = new ConcurrentHashMap<>();
62 62 private volatile int messagesPerQueue;
63 63
64 64 public TbServiceBusConsumerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String topic, TbQueueMsgDecoder<T> decoder) {
... ...
... ... @@ -33,6 +33,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
33 33 import java.util.HashMap;
34 34 import java.util.Map;
35 35 import java.util.concurrent.CompletableFuture;
  36 +import java.util.concurrent.ConcurrentHashMap;
36 37 import java.util.concurrent.ExecutorService;
37 38 import java.util.concurrent.Executors;
38 39
... ... @@ -42,14 +43,14 @@ public class TbServiceBusProducerTemplate<T extends TbQueueMsg> implements TbQue
42 43 private final Gson gson = new Gson();
43 44 private final TbQueueAdmin admin;
44 45 private final TbServiceBusSettings serviceBusSettings;
45   - private final Map<String, QueueClient> clients = new HashMap<>();
46   - private ExecutorService executorService;
  46 + private final Map<String, QueueClient> clients = new ConcurrentHashMap<>();
  47 + private final ExecutorService executorService;
47 48
48 49 public TbServiceBusProducerTemplate(TbQueueAdmin admin, TbServiceBusSettings serviceBusSettings, String defaultTopic) {
49 50 this.admin = admin;
50 51 this.defaultTopic = defaultTopic;
51 52 this.serviceBusSettings = serviceBusSettings;
52   - executorService = Executors.newSingleThreadExecutor();
  53 + executorService = Executors.newCachedThreadPool();
53 54 }
54 55
55 56 @Override
... ...
... ... @@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
21 21 import org.apache.kafka.clients.consumer.ConsumerRecord;
22 22 import org.apache.kafka.clients.consumer.ConsumerRecords;
23 23 import org.apache.kafka.clients.consumer.KafkaConsumer;
24   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
25 24 import org.thingsboard.server.queue.TbQueueAdmin;
26 25 import org.thingsboard.server.queue.TbQueueMsg;
27 26 import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
... ... @@ -32,7 +31,6 @@ import java.util.ArrayList;
32 31 import java.util.Collections;
33 32 import java.util.List;
34 33 import java.util.Properties;
35   -import java.util.stream.Collectors;
36 34
37 35 /**
38 36 * Created by ashvayka on 24.09.18.
... ... @@ -69,7 +67,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
69 67 }
70 68
71 69 @Override
72   - protected void doSubscribe( List<String> topicNames) {
  70 + protected void doSubscribe(List<String> topicNames) {
73 71 topicNames.forEach(admin::createTopicIfNotExists);
74 72 consumer.subscribe(topicNames);
75 73 }
... ...
... ... @@ -15,6 +15,7 @@
15 15 */
16 16 package org.thingsboard.server.queue.pubsub;
17 17
  18 +import com.google.api.gax.rpc.AlreadyExistsException;
18 19 import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
19 20 import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
20 21 import com.google.cloud.pubsub.v1.TopicAdminClient;
... ... @@ -24,9 +25,9 @@ import com.google.pubsub.v1.ListSubscriptionsRequest;
24 25 import com.google.pubsub.v1.ListTopicsRequest;
25 26 import com.google.pubsub.v1.ProjectName;
26 27 import com.google.pubsub.v1.ProjectSubscriptionName;
27   -import com.google.pubsub.v1.ProjectTopicName;
28 28 import com.google.pubsub.v1.Subscription;
29 29 import com.google.pubsub.v1.Topic;
  30 +import com.google.pubsub.v1.TopicName;
30 31 import lombok.extern.slf4j.Slf4j;
31 32 import org.thingsboard.server.queue.TbQueueAdmin;
32 33
... ... @@ -103,7 +104,10 @@ public class TbPubSubAdmin implements TbQueueAdmin {
103 104
104 105 @Override
105 106 public void createTopicIfNotExists(String partition) {
106   - ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), partition);
  107 + TopicName topicName = TopicName.newBuilder()
  108 + .setTopic(partition)
  109 + .setProject(pubSubSettings.getProjectId())
  110 + .build();
107 111
108 112 if (topicSet.contains(topicName.toString())) {
109 113 createSubscriptionIfNotExists(partition, topicName);
... ... @@ -121,13 +125,18 @@ public class TbPubSubAdmin implements TbQueueAdmin {
121 125 }
122 126 }
123 127
124   - topicAdminClient.createTopic(topicName);
125   - topicSet.add(topicName.toString());
126   - log.info("Created new topic: [{}]", topicName.toString());
  128 + try {
  129 + topicAdminClient.createTopic(topicName);
  130 + log.info("Created new topic: [{}]", topicName.toString());
  131 + } catch (AlreadyExistsException e) {
  132 + log.info("[{}] Topic already exist.", topicName.toString());
  133 + } finally {
  134 + topicSet.add(topicName.toString());
  135 + }
127 136 createSubscriptionIfNotExists(partition, topicName);
128 137 }
129 138
130   - private void createSubscriptionIfNotExists(String partition, ProjectTopicName topicName) {
  139 + private void createSubscriptionIfNotExists(String partition, TopicName topicName) {
131 140 ProjectSubscriptionName subscriptionName =
132 141 ProjectSubscriptionName.of(pubSubSettings.getProjectId(), partition);
133 142
... ... @@ -153,9 +162,14 @@ public class TbPubSubAdmin implements TbQueueAdmin {
153 162 setAckDeadline(subscriptionBuilder);
154 163 setMessageRetention(subscriptionBuilder);
155 164
156   - subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
157   - subscriptionSet.add(subscriptionName.toString());
158   - log.info("Created new subscription: [{}]", subscriptionName.toString());
  165 + try {
  166 + subscriptionAdminClient.createSubscription(subscriptionBuilder.build());
  167 + log.info("Created new subscription: [{}]", subscriptionName.toString());
  168 + } catch (AlreadyExistsException e) {
  169 + log.info("[{}] Subscription already exist.", subscriptionName.toString());
  170 + } finally {
  171 + subscriptionSet.add(subscriptionName.toString());
  172 + }
159 173 }
160 174
161 175 private void setAckDeadline(Subscription.Builder builder) {
... ...
... ... @@ -15,7 +15,6 @@
15 15 */
16 16 package org.thingsboard.server.queue.pubsub;
17 17
18   -import com.amazonaws.services.sqs.model.Message;
19 18 import com.google.api.core.ApiFuture;
20 19 import com.google.api.core.ApiFutures;
21 20 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
... ... @@ -31,13 +30,10 @@ import com.google.pubsub.v1.PullResponse;
31 30 import com.google.pubsub.v1.ReceivedMessage;
32 31 import lombok.extern.slf4j.Slf4j;
33 32 import org.springframework.util.CollectionUtils;
34   -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
35 33 import org.thingsboard.server.queue.TbQueueAdmin;
36   -import org.thingsboard.server.queue.TbQueueConsumer;
37 34 import org.thingsboard.server.queue.TbQueueMsg;
38 35 import org.thingsboard.server.queue.TbQueueMsgDecoder;
39 36 import org.thingsboard.server.queue.common.AbstractParallelTbQueueConsumerTemplate;
40   -import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;
41 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
42 38
43 39 import java.io.IOException;
... ... @@ -49,9 +45,6 @@ import java.util.Objects;
49 45 import java.util.Set;
50 46 import java.util.concurrent.CopyOnWriteArrayList;
51 47 import java.util.concurrent.ExecutionException;
52   -import java.util.concurrent.ExecutorService;
53   -import java.util.concurrent.Executors;
54   -import java.util.concurrent.TimeUnit;
55 48 import java.util.stream.Collectors;
56 49
57 50 @Slf4j
... ... @@ -136,7 +129,7 @@ public class TbPubSubConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
136 129 PullRequest pullRequest =
137 130 PullRequest.newBuilder()
138 131 .setMaxMessages(messagesPerTopic)
139   - .setReturnImmediately(false) // return immediately if messages are not available
  132 +// .setReturnImmediately(false) // return immediately if messages are not available
140 133 .setSubscription(subscriptionName)
141 134 .build();
142 135
... ...
... ... @@ -49,7 +49,7 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
49 49
50 50 private final Map<String, Publisher> publisherMap = new ConcurrentHashMap<>();
51 51
52   - private ExecutorService pubExecutor = Executors.newCachedThreadPool();
  52 + private final ExecutorService pubExecutor = Executors.newCachedThreadPool();
53 53
54 54 public TbPubSubProducerTemplate(TbQueueAdmin admin, TbPubSubSettings pubSubSettings, String defaultTopic) {
55 55 this.defaultTopic = defaultTopic;
... ... @@ -124,8 +124,8 @@ public class TbPubSubProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
124 124 publisherMap.put(topic, publisher);
125 125 return publisher;
126 126 } catch (IOException e) {
127   - log.error("Failed to create topic [{}].", topic, e);
128   - throw new RuntimeException("Failed to create topic.", e);
  127 + log.error("Failed to create Publisher for the topic [{}].", topic, e);
  128 + throw new RuntimeException("Failed to create Publisher for the topic.", e);
129 129 }
130 130 }
131 131
... ...
... ... @@ -27,13 +27,11 @@ import java.util.concurrent.TimeoutException;
27 27 @Slf4j
28 28 public class TbRabbitMqAdmin implements TbQueueAdmin {
29 29
30   - private final TbRabbitMqSettings rabbitMqSettings;
31 30 private final Channel channel;
32 31 private final Connection connection;
33 32 private final Map<String, Object> arguments;
34 33
35 34 public TbRabbitMqAdmin(TbRabbitMqSettings rabbitMqSettings, Map<String, Object> arguments) {
36   - this.rabbitMqSettings = rabbitMqSettings;
37 35 this.arguments = arguments;
38 36
39 37 try {
... ...
... ... @@ -30,6 +30,8 @@ import org.thingsboard.server.queue.TbQueueProducer;
30 30 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
31 31
32 32 import java.io.IOException;
  33 +import java.util.Set;
  34 +import java.util.concurrent.ConcurrentHashMap;
33 35 import java.util.concurrent.Executors;
34 36 import java.util.concurrent.TimeoutException;
35 37
... ... @@ -39,10 +41,12 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
39 41 private final Gson gson = new Gson();
40 42 private final TbQueueAdmin admin;
41 43 private final TbRabbitMqSettings rabbitMqSettings;
42   - private ListeningExecutorService producerExecutor;
  44 + private final ListeningExecutorService producerExecutor;
43 45 private final Channel channel;
44 46 private final Connection connection;
45 47
  48 + private final Set<TopicPartitionInfo> topics = ConcurrentHashMap.newKeySet();
  49 +
46 50 public TbRabbitMqProducerTemplate(TbQueueAdmin admin, TbRabbitMqSettings rabbitMqSettings, String defaultTopic) {
47 51 this.admin = admin;
48 52 this.defaultTopic = defaultTopic;
... ... @@ -75,6 +79,7 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
75 79
76 80 @Override
77 81 public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
  82 + createTopicIfNotExist(tpi);
78 83 AMQP.BasicProperties properties = new AMQP.BasicProperties();
79 84 try {
80 85 channel.basicPublish(rabbitMqSettings.getExchangeName(), tpi.getFullTopicName(), properties, gson.toJson(new DefaultTbQueueMsg(msg)).getBytes());
... ... @@ -110,4 +115,11 @@ public class TbRabbitMqProducerTemplate<T extends TbQueueMsg> implements TbQueue
110 115 }
111 116 }
112 117
  118 + private void createTopicIfNotExist(TopicPartitionInfo tpi) {
  119 + if (topics.contains(tpi)) {
  120 + return;
  121 + }
  122 + admin.createTopicIfNotExists(tpi.getFullTopicName());
  123 + topics.add(tpi);
  124 + }
113 125 }
... ...
... ... @@ -84,10 +84,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
84 84
85 85 @Override
86 86 protected List<Message> doPoll(long durationInMillis) {
87   - if (!pendingMessages.isEmpty()) {
88   - log.warn("Present {} non committed messages.", pendingMessages.size());
89   - return Collections.emptyList();
90   - }
91 87 int duration = (int) TimeUnit.MILLISECONDS.toSeconds(durationInMillis);
92 88 List<ListenableFuture<List<Message>>> futureList = queueUrls
93 89 .stream()
... ... @@ -145,7 +141,6 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
145 141 ReceiveMessageRequest request = new ReceiveMessageRequest();
146 142 request
147 143 .withWaitTimeSeconds(waitTimeSeconds)
148   - .withMessageAttributeNames("headers")
149 144 .withQueueUrl(url)
150 145 .withMaxNumberOfMessages(MAX_NUM_MSGS);
151 146 return sqsClient.receiveMessage(request).getMessages();
... ...
... ... @@ -37,6 +37,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
37 37 import org.thingsboard.server.queue.common.DefaultTbQueueMsg;
38 38
39 39 import java.util.Map;
  40 +import java.util.UUID;
40 41 import java.util.concurrent.ConcurrentHashMap;
41 42 import java.util.concurrent.Executors;
42 43
... ... @@ -80,7 +81,9 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
80 81 sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName()));
81 82 sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg)));
82 83
83   - sendMsgRequest.withMessageGroupId(msg.getKey().toString());
  84 + sendMsgRequest.withMessageGroupId(tpi.getTopic());
  85 + sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString());
  86 +
84 87 ListenableFuture<SendMessageResult> future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest));
85 88
86 89 Futures.addCallback(future, new FutureCallback<SendMessageResult>() {
... ...
... ... @@ -55,7 +55,6 @@ public class TbAwsSqsQueueAttributes {
55 55 @PostConstruct
56 56 private void init() {
57 57 defaultAttributes.put(QueueAttributeName.FifoQueue.toString(), "true");
58   - defaultAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true");
59 58
60 59 coreAttributes = getConfigs(coreProperties);
61 60 ruleEngineAttributes = getConfigs(ruleEngineProperties);
... ...
1   -
  1 +TB_QUEUE_TYPE=kafka
2 2 REMOTE_JS_EVAL_REQUEST_TOPIC=js_eval.requests
3 3 TB_KAFKA_SERVERS=kafka:9092
4 4 LOGGER_LEVEL=info
... ...
... ... @@ -14,7 +14,7 @@
14 14 # limitations under the License.
15 15 #
16 16
17   -service-type: "TB_SERVICE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
  17 +queue_type: "TB_QUEUE_TYPE" #kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
18 18 request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
19 19
20 20 js:
... ... @@ -25,18 +25,18 @@ kafka:
25 25 # Kafka Bootstrap Servers
26 26 servers: "TB_KAFKA_SERVERS"
27 27 replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR"
28   - topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
  28 + topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
29 29
30 30 pubsub:
31 31 project_id: "TB_QUEUE_PUBSUB_PROJECT_ID"
32 32 service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT"
33   - queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
  33 + queue_properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
34 34
35 35 aws_sqs:
36 36 access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID"
37 37 secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY"
38 38 region: "TB_QUEUE_AWS_SQS_REGION"
39   - queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
  39 + queue_properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
40 40
41 41 rabbitmq:
42 42 host: "TB_QUEUE_RABBIT_MQ_HOST"
... ... @@ -44,14 +44,14 @@ rabbitmq:
44 44 virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST"
45 45 username: "TB_QUEUE_RABBIT_MQ_USERNAME"
46 46 password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
47   - queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
  47 + queue_properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
48 48
49 49 service_bus:
50 50 namespace_name: "TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME"
51 51 sas_key_name: "TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME"
52 52 sas_key: "TB_QUEUE_SERVICE_BUS_SAS_KEY"
53 53 max_messages: "TB_QUEUE_SERVICE_BUS_MAX_MESSAGES"
54   - queue-properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
  54 + queue_properties: "TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES"
55 55
56 56 logger:
57 57 level: "LOGGER_LEVEL"
... ...
... ... @@ -14,7 +14,7 @@
14 14 # limitations under the License.
15 15 #
16 16
17   -service-type: "kafka"
  17 +queue_type: "kafka"
18 18 request_topic: "js_eval.requests"
19 19
20 20 js:
... ... @@ -25,13 +25,13 @@ kafka:
25 25 # Kafka Bootstrap Servers
26 26 servers: "localhost:9092"
27 27 replication_factor: "1"
28   - topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
  28 + topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
29 29
30 30 pubsub:
31   - queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
  31 + queue_properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
32 32
33 33 aws_sqs:
34   - queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
  34 + queue_properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
35 35
36 36 rabbitmq:
37 37 host: "localhost"
... ... @@ -39,10 +39,10 @@ rabbitmq:
39 39 virtual_host: "/"
40 40 username: "admin"
41 41 password: "password"
42   - queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
  42 + queue_properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
43 43
44 44 service_bus:
45   - queue-properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
  45 + queue_properties: "lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800"
46 46
47 47 logger:
48 48 level: "info"
... ...
... ... @@ -22,6 +22,7 @@
22 22 "azure-sb": "^0.11.1",
23 23 "long": "^4.0.0",
24 24 "uuid-parse": "^1.0.0",
  25 + "uuid-random": "^1.3.0",
25 26 "winston": "^3.0.0",
26 27 "winston-daily-rotate-file": "^3.2.1"
27 28 },
... ...
... ... @@ -19,6 +19,7 @@
19 19 const config = require('config'),
20 20 JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
21 21 logger = require('../config/logger')._logger('awsSqsTemplate');
  22 +const uuid = require('uuid-random');
22 23
23 24 const requestTopic = config.get('request_topic');
24 25
... ... @@ -26,10 +27,10 @@ const accessKeyId = config.get('aws_sqs.access_key_id');
26 27 const secretAccessKey = config.get('aws_sqs.secret_access_key');
27 28 const region = config.get('aws_sqs.region');
28 29 const AWS = require('aws-sdk');
29   -const queueProperties = config.get('aws_sqs.queue-properties');
30   -const poolInterval = config.get('js.response_poll_interval');
  30 +const queueProperties = config.get('aws_sqs.queue_properties');
  31 +const pollInterval = config.get('js.response_poll_interval');
31 32
32   -let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'};
  33 +let queueAttributes = {FifoQueue: 'true'};
33 34 let sqsClient;
34 35 let requestQueueURL;
35 36 const queueUrls = new Map();
... ... @@ -51,7 +52,12 @@ function AwsSqsProducer() {
51 52 queueUrls.set(responseTopic, responseQueueUrl);
52 53 }
53 54
54   - let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId};
  55 + let params = {
  56 + MessageBody: msgBody,
  57 + QueueUrl: responseQueueUrl,
  58 + MessageGroupId: 'js_eval',
  59 + MessageDeduplicationId: uuid()
  60 + };
55 61
56 62 return new Promise((resolve, reject) => {
57 63 sqsClient.sendMessage(params, function (err, data) {
... ... @@ -74,11 +80,13 @@ function AwsSqsProducer() {
74 80
75 81 const queues = await getQueues();
76 82
77   - queues.forEach(queueUrl => {
78   - const delimiterPosition = queueUrl.lastIndexOf('/');
79   - const queueName = queueUrl.substring(delimiterPosition + 1);
80   - queueUrls.set(queueName, queueUrl);
81   - })
  83 + if (queues) {
  84 + queues.forEach(queueUrl => {
  85 + const delimiterPosition = queueUrl.lastIndexOf('/');
  86 + const queueName = queueUrl.substring(delimiterPosition + 1);
  87 + queueUrls.set(queueName, queueUrl);
  88 + });
  89 + }
82 90
83 91 parseQueueProperties();
84 92
... ... @@ -95,6 +103,7 @@ function AwsSqsProducer() {
95 103 WaitTimeSeconds: poolInterval / 1000
96 104 };
97 105 while (!stopped) {
  106 + let pollStartTs = new Date().getTime();
98 107 const messages = await new Promise((resolve, reject) => {
99 108 sqsClient.receiveMessage(params, function (err, data) {
100 109 if (err) {
... ... @@ -127,6 +136,11 @@ function AwsSqsProducer() {
127 136 //do nothing
128 137 }
129 138 });
  139 + } else {
  140 + let pollDuration = new Date().getTime() - pollStartTs;
  141 + if (pollDuration < pollInterval) {
  142 + await sleep(pollInterval - pollDuration);
  143 + }
130 144 }
131 145 }
132 146 } catch (e) {
... ... @@ -175,6 +189,12 @@ function parseQueueProperties() {
175 189 });
176 190 }
177 191
  192 +function sleep(ms) {
  193 + return new Promise((resolve) => {
  194 + setTimeout(resolve, ms);
  195 + });
  196 +}
  197 +
178 198 process.on('exit', () => {
179 199 stopped = true;
180 200 logger.info('Aws Sqs client stopped.');
... ...
... ... @@ -20,7 +20,7 @@ const config = require('config'),
20 20 logger = require('../config/logger')._logger('kafkaTemplate'),
21 21 KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
22 22 const replicationFactor = config.get('kafka.replication_factor');
23   -const topicProperties = config.get('kafka.topic-properties');
  23 +const topicProperties = config.get('kafka.topic_properties');
24 24
25 25 let kafkaClient;
26 26 let kafkaAdmin;
... ...
... ... @@ -24,7 +24,7 @@ const {PubSub} = require('@google-cloud/pubsub');
24 24 const projectId = config.get('pubsub.project_id');
25 25 const credentials = JSON.parse(config.get('pubsub.service_account'));
26 26 const requestTopic = config.get('request_topic');
27   -const queueProperties = config.get('pubsub.queue-properties');
  27 +const queueProperties = config.get('pubsub.queue_properties');
28 28
29 29 let pubSubClient;
30 30
... ... @@ -98,23 +98,32 @@ function PubSubProducer() {
98 98
99 99 async function createTopic(topic) {
100 100 if (!topics.includes(topic)) {
101   - await pubSubClient.createTopic(topic);
  101 + try {
  102 + await pubSubClient.createTopic(topic);
  103 + logger.info('Created new Pub/Sub topic: %s', topic);
  104 + } catch (e) {
  105 + logger.info('Pub/Sub topic already exists');
  106 + }
102 107 topics.push(topic);
103   - logger.info('Created new Pub/Sub topic: %s', topic);
104 108 }
105 109 await createSubscription(topic)
106 110 }
107 111
108 112 async function createSubscription(topic) {
109 113 if (!subscriptions.includes(topic)) {
110   - await pubSubClient.createSubscription(topic, topic, {
111   - topic: topic,
112   - subscription: topic,
113   - ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
114   - messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
115   - });
  114 + try {
  115 + await pubSubClient.createSubscription(topic, topic, {
  116 + topic: topic,
  117 + subscription: topic,
  118 + ackDeadlineSeconds: queueProps['ackDeadlineInSec'],
  119 + messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']}
  120 + });
  121 + logger.info('Created new Pub/Sub subscription: %s', topic);
  122 + } catch (e) {
  123 + logger.info('Pub/Sub subscription already exists.');
  124 + }
  125 +
116 126 subscriptions.push(topic);
117   - logger.info('Created new Pub/Sub subscription: %s', topic);
118 127 }
119 128 }
120 129
... ...
... ... @@ -26,23 +26,23 @@ const port = config.get('rabbitmq.port');
26 26 const vhost = config.get('rabbitmq.virtual_host');
27 27 const username = config.get('rabbitmq.username');
28 28 const password = config.get('rabbitmq.password');
29   -const queueProperties = config.get('rabbitmq.queue-properties');
30   -const poolInterval = config.get('js.response_poll_interval');
  29 +const queueProperties = config.get('rabbitmq.queue_properties');
  30 +const pollInterval = config.get('js.response_poll_interval');
31 31
32 32 const amqp = require('amqplib/callback_api');
33 33
34   -let queueParams = {durable: false, exclusive: false, autoDelete: false};
  34 +let queueOptions = {durable: false, exclusive: false, autoDelete: false};
35 35 let connection;
36 36 let channel;
37 37 let stopped = false;
38   -const responseTopics = [];
  38 +let queues = [];
39 39
40 40 function RabbitMqProducer() {
41 41 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
42 42
43   - if (!responseTopics.includes(responseTopic)) {
  43 + if (!queues.includes(responseTopic)) {
44 44 await createQueue(responseTopic);
45   - responseTopics.push(responseTopic);
  45 + queues.push(responseTopic);
46 46 }
47 47
48 48 let data = JSON.stringify(
... ... @@ -98,6 +98,7 @@ function RabbitMqProducer() {
98 98 const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
99 99
100 100 while (!stopped) {
  101 + let pollStartTs = new Date().getTime();
101 102 let message = await new Promise((resolve, reject) => {
102 103 channel.get(requestTopic, {}, function (err, msg) {
103 104 if (err) {
... ... @@ -112,7 +113,10 @@ function RabbitMqProducer() {
112 113 messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
113 114 channel.ack(message);
114 115 } else {
115   - await sleep(poolInterval);
  116 + let pollDuration = new Date().getTime() - pollStartTs;
  117 + if (pollDuration < pollInterval) {
  118 + await sleep(pollInterval - pollDuration);
  119 + }
116 120 }
117 121 }
118 122 } catch (e) {
... ... @@ -123,16 +127,18 @@ function RabbitMqProducer() {
123 127 })();
124 128
125 129 function parseQueueProperties() {
  130 + let args = {};
126 131 const props = queueProperties.split(';');
127 132 props.forEach(p => {
128 133 const delimiterPosition = p.indexOf(':');
129   - queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  134 + args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1);
130 135 });
  136 + queueOptions['arguments'] = args;
131 137 }
132 138
133   -function createQueue(topic) {
  139 +async function createQueue(topic) {
134 140 return new Promise((resolve, reject) => {
135   - channel.assertQueue(topic, queueParams, function (err) {
  141 + channel.assertQueue(topic, queueOptions, function (err) {
136 142 if (err) {
137 143 reject(err);
138 144 } else {
... ...
... ... @@ -26,7 +26,7 @@ const requestTopic = config.get('request_topic');
26 26 const namespaceName = config.get('service_bus.namespace_name');
27 27 const sasKeyName = config.get('service_bus.sas_key_name');
28 28 const sasKey = config.get('service_bus.sas_key');
29   -const queueProperties = config.get('service_bus.queue-properties');
  29 +const queueProperties = config.get('service_bus.queue_properties');
30 30
31 31 let sbClient;
32 32 let receiverClient;
... ... @@ -140,6 +140,7 @@ function parseQueueProperties() {
140 140 properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
141 141 });
142 142 queueOptions = {
  143 + DuplicateDetection: 'false',
143 144 MaxSizeInMegabytes: properties['maxSizeInMb'],
144 145 DefaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
145 146 LockDuration: `PT${properties['lockDurationInSec']}S`
... ...
... ... @@ -16,7 +16,7 @@
16 16
17 17 const config = require('config'), logger = require('./config/logger')._logger('main');
18 18
19   -const serviceType = config.get('service-type');
  19 +const serviceType = config.get('queue_type');
20 20 switch (serviceType) {
21 21 case 'kafka':
22 22 logger.info('Starting kafka template.');
... ...
... ... @@ -95,7 +95,7 @@
95 95 <snakeyaml.version>1.25</snakeyaml.version>
96 96 <struts.version>1.3.10</struts.version>
97 97 <amazonaws.sqs.version>1.11.747</amazonaws.sqs.version>
98   - <pubsub.client.version>1.84.0</pubsub.client.version>
  98 + <pubsub.client.version>1.105.0</pubsub.client.version>
99 99 <azure-servicebus.version>3.2.0</azure-servicebus.version>
100 100 <passay.version>1.5.0</passay.version>
101 101 <ua-parser.version>1.4.3</ua-parser.version>
... ...