Commit ef35ee8c304d743c55c053a0996ce84793daf40e

Authored by YevhenBondarenko
1 parent 214a6060

added queue settings to js-executor

... ... @@ -17,30 +17,34 @@
17 17 service-type: "TB_SERVICE_TYPE"
18 18 request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC"
19 19
  20 +js:
  21 + response_poll_interval: "REMOTE_JS_RESPONSE_POLL_INTERVAL_MS"
  22 +
20 23 kafka:
21 24 bootstrap:
22 25 # Kafka Bootstrap Servers
23 26 servers: "TB_KAFKA_SERVERS"
  27 + replication_factor: "TB_QUEUE_KAFKA_REPLICATION_FACTOR"
  28 + topic-properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
24 29
25 30 pubsub:
26 31 project_id: "TB_QUEUE_PUBSUB_PROJECT_ID"
27 32 service_account: "TB_QUEUE_PUBSUB_SERVICE_ACCOUNT"
  33 + queue-properties: "TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES"
28 34
29 35 aws_sqs:
30 36 access_key_id: "TB_QUEUE_AWS_SQS_ACCESS_KEY_ID"
31 37 secret_access_key: "TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY"
32 38 region: "TB_QUEUE_AWS_SQS_REGION"
  39 + queue-properties: "TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES"
33 40
34 41 rabbitmq:
35   - exchange_name: "TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME"
36 42 host: "TB_QUEUE_RABBIT_MQ_HOST"
37 43 port: "TB_QUEUE_RABBIT_MQ_PORT"
38 44 virtual_host: "TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST"
39 45 username: "TB_QUEUE_RABBIT_MQ_USERNAME"
40 46 password: "TB_QUEUE_RABBIT_MQ_PASSWORD"
41   - automatic_recovery_enabled: "TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED"
42   - connection_timeout: "TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT"
43   - handshake_timeout: "TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT"
  47 + queue-properties: "TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES"
44 48
45 49 logger:
46 50 level: "LOGGER_LEVEL"
... ...
... ... @@ -15,23 +15,31 @@
15 15 #
16 16
17 17 service-type: "kafka"
18   -request_topic: "js.eval.requests"
  18 +request_topic: "js_eval.requests"
  19 +
  20 +js:
  21 + response_poll_interval: "25"
19 22
20 23 kafka:
21 24 bootstrap:
22 25 # Kafka Bootstrap Servers
23 26 servers: "localhost:9092"
  27 + replication_factor: "1"
  28 + topic-properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600"
  29 +
  30 +pubsub:
  31 + queue-properties: "ackDeadlineInSec:30;messageRetentionInSec:604800"
  32 +
  33 +aws_sqs:
  34 + queue-properties: "VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800"
24 35
25 36 rabbitmq:
26   - exchange_name: ""
27 37 host: "localhost"
28 38 port: "5672"
29 39 virtual_host: "/"
30   - username: "YOUR_USERNAME"
31   - password: "YOUR_PASSWORD"
32   - automatic_recovery_enabled: "false"
33   - connection_timeout: "60000"
34   - handshake_timeout: "10000"
  40 + username: "admin"
  41 + password: "password"
  42 + queue-properties: "x-max-length-bytes:1048576000;x-message-ttl:604800000"
35 43
36 44
37 45 logger:
... ...
... ... @@ -14,7 +14,7 @@
14 14 "dependencies": {
15 15 "config": "^3.2.2",
16 16 "js-yaml": "^3.12.0",
17   - "kafkajs": "^1.11.0",
  17 + "kafkajs": "^1.12.0",
18 18 "@google-cloud/pubsub": "^1.7.1",
19 19 "aws-sdk": "^2.663.0",
20 20 "amqplib": "^0.5.5",
... ...
... ... @@ -26,10 +26,13 @@ const accessKeyId = config.get('aws_sqs.access_key_id');
26 26 const secretAccessKey = config.get('aws_sqs.secret_access_key');
27 27 const region = config.get('aws_sqs.region');
28 28 const AWS = require('aws-sdk');
  29 +const queueProperties = config.get('aws_sqs.queue-properties');
  30 +const poolInterval = config.get('js.response_poll_interval');
29 31
  32 +let queueAttributes = {FifoQueue: 'true', ContentBasedDeduplication: 'true'};
30 33 let sqsClient;
31   -let queueURL;
32   -let responseTopics = new Map();
  34 +let requestQueueURL;
  35 +let queueUrls = new Map();
33 36 let stopped = false;
34 37
35 38 function AwsSqsProducer() {
... ... @@ -41,11 +44,11 @@ function AwsSqsProducer() {
41 44 headers: headers
42 45 });
43 46
44   - let responseQueueUrl = responseTopics.get(responseTopic);
  47 + let responseQueueUrl = queueUrls.get(topicToSqsQueueName(responseTopic));
45 48
46 49 if (!responseQueueUrl) {
47 50 responseQueueUrl = await createQueue(responseTopic);
48   - responseTopics.set(responseTopic, responseQueueUrl);
  51 + queueUrls.set(responseTopic, responseQueueUrl);
49 52 }
50 53
51 54 let params = {MessageBody: msgBody, QueueUrl: responseQueueUrl, MessageGroupId: scriptId};
... ... @@ -69,13 +72,27 @@ function AwsSqsProducer() {
69 72
70 73 sqsClient = new AWS.SQS({apiVersion: '2012-11-05'});
71 74
72   - queueURL = await createQueue(requestTopic);
  75 + const queues = await getQueues();
  76 +
  77 + queues.forEach(queueUrl => {
  78 + const delimiterPosition = queueUrl.lastIndexOf('/');
  79 + const queueName = queueUrl.substring(delimiterPosition + 1);
  80 + queueUrls.set(queueName, queueUrl);
  81 + })
  82 +
  83 + parseQueueProperties();
  84 +
  85 + requestQueueURL = queueUrls.get(topicToSqsQueueName(requestTopic));
  86 + if (!requestQueueURL) {
  87 + requestQueueURL = await createQueue(requestTopic);
  88 + }
  89 +
73 90 const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer());
74 91
75 92 const params = {
76 93 MaxNumberOfMessages: 10,
77   - QueueUrl: queueURL,
78   - WaitTimeSeconds: 0.025
  94 + QueueUrl: requestQueueURL,
  95 + WaitTimeSeconds: poolInterval / 1000
79 96 };
80 97 while (!stopped) {
81 98 const messages = await new Promise((resolve, reject) => {
... ... @@ -100,7 +117,7 @@ function AwsSqsProducer() {
100 117 });
101 118
102 119 const deleteBatch = {
103   - QueueUrl: queueURL,
  120 + QueueUrl: requestQueueURL,
104 121 Entries: entries
105 122 };
106 123 sqsClient.deleteMessageBatch(deleteBatch, function (err, data) {
... ... @@ -120,14 +137,9 @@ function AwsSqsProducer() {
120 137 })();
121 138
122 139 function createQueue(topic) {
123   - let queueName = topic.replace(/\./g, '_') + '.fifo';
124   - let queueParams = {
125   - QueueName: queueName, Attributes: {
126   - FifoQueue: 'true',
127   - ContentBasedDeduplication: 'true'
  140 + let queueName = topicToSqsQueueName(topic);
  141 + let queueParams = {QueueName: queueName, Attributes: queueAttributes};
128 142
129   - }
130   - };
131 143 return new Promise((resolve, reject) => {
132 144 sqsClient.createQueue(queueParams, function (err, data) {
133 145 if (err) {
... ... @@ -139,6 +151,30 @@ function createQueue(topic) {
139 151 });
140 152 }
141 153
  154 +function getQueues() {
  155 + return new Promise((resolve, reject) => {
  156 + sqsClient.listQueues(function (err, data) {
  157 + if (err) {
  158 + reject(err);
  159 + } else {
  160 + resolve(data.QueueUrls);
  161 + }
  162 + });
  163 + });
  164 +}
  165 +
  166 +function topicToSqsQueueName(topic) {
  167 + return topic.replace(/\./g, '_') + '.fifo';
  168 +}
  169 +
  170 +function parseQueueProperties() {
  171 + const props = queueProperties.split(';');
  172 + props.forEach(p => {
  173 + const delimiterPosition = p.indexOf(':');
  174 + queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  175 + });
  176 +}
  177 +
142 178 process.on('exit', () => {
143 179 stopped = true;
144 180 logger.info('Aws Sqs client stopped.');
... ...
... ... @@ -19,13 +19,28 @@ const config = require('config'),
19 19 JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
20 20 logger = require('../config/logger')._logger('kafkaTemplate'),
21 21 KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator;
  22 +const replicationFactor = config.get('kafka.replication_factor');
  23 +const topicProperties = config.get('kafka.topic-properties');
22 24
23 25 let kafkaClient;
  26 +let kafkaAdmin;
24 27 let consumer;
25 28 let producer;
26 29
  30 +const topics = [];
  31 +const configEntries = [];
  32 +
27 33 function KafkaProducer() {
28 34 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  35 +
  36 + if (!topics.includes(responseTopic)) {
  37 + let createResponseTopicResult = await createTopic(responseTopic);
  38 + topics.push(responseTopic);
  39 + if (createResponseTopicResult) {
  40 + logger.info('Created new topic: %s', requestTopic);
  41 + }
  42 + }
  43 +
29 44 let headersData = headers.data;
30 45 headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)]));
31 46 return producer.send(
... ... @@ -47,10 +62,10 @@ function KafkaProducer() {
47 62 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
48 63
49 64 const kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
50   - const kafkaRequestTopic = config.get('request_topic');
  65 + const requestTopic = config.get('request_topic');
51 66
52 67 logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
53   - logger.info('Kafka Requests Topic: %s', kafkaRequestTopic);
  68 + logger.info('Kafka Requests Topic: %s', requestTopic);
54 69
55 70 kafkaClient = new Kafka({
56 71 brokers: kafkaBootstrapServers.split(','),
... ... @@ -58,12 +73,23 @@ function KafkaProducer() {
58 73 logCreator: KafkaJsWinstonLogCreator
59 74 });
60 75
  76 + parseTopicProperties();
  77 +
  78 + kafkaAdmin = kafkaClient.admin();
  79 + await kafkaAdmin.connect();
  80 +
  81 + let createRequestTopicResult = await createTopic(requestTopic);
  82 +
  83 + if (createRequestTopicResult) {
  84 + logger.info('Created new topic: %s', requestTopic);
  85 + }
  86 +
61 87 consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
62 88 producer = kafkaClient.producer();
63 89 const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer());
64 90 await consumer.connect();
65 91 await producer.connect();
66   - await consumer.subscribe({topic: kafkaRequestTopic});
  92 + await consumer.subscribe({topic: requestTopic});
67 93
68 94 logger.info('Started ThingsBoard JavaScript Executor Microservice.');
69 95 await consumer.run({
... ... @@ -90,12 +116,37 @@ function KafkaProducer() {
90 116 }
91 117 })();
92 118
  119 +function createTopic(topic) {
  120 + return kafkaAdmin.createTopics({
  121 + topics: [{
  122 + topic: topic,
  123 + replicationFactor: replicationFactor,
  124 + configEntries: configEntries
  125 + }]
  126 + });
  127 +}
  128 +
  129 +function parseTopicProperties() {
  130 + const props = topicProperties.split(';');
  131 + props.forEach(p => {
  132 + const delimiterPosition = p.indexOf(':');
  133 + configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)});
  134 + });
  135 +}
  136 +
93 137 process.on('exit', () => {
94 138 exit(0);
95 139 });
96 140
97 141 async function exit(status) {
98 142 logger.info('Exiting with status: %d ...', status);
  143 +
  144 + if (kafkaAdmin) {
  145 + logger.info('Stopping Kafka Admin...');
  146 + await kafkaAdmin.disconnect();
  147 + logger.info('Kafka Admin stopped.');
  148 + }
  149 +
99 150 if (consumer) {
100 151 logger.info('Stopping Kafka Consumer...');
101 152 let _consumer = consumer;
... ...
... ... @@ -24,11 +24,21 @@ const {PubSub} = require('@google-cloud/pubsub');
24 24 const projectId = config.get('pubsub.project_id');
25 25 const credentials = JSON.parse(config.get('pubsub.service_account'));
26 26 const requestTopic = config.get('request_topic');
  27 +const queueProperties = config.get('pubsub.queue-properties');
27 28
28 29 let pubSubClient;
29 30
  31 +const topics = [];
  32 +const subscriptions = [];
  33 +let queueProps = [];
  34 +
30 35 function PubSubProducer() {
31 36 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
  37 +
  38 + if (!(subscriptions.includes(responseTopic) && topics.includes(requestTopic))) {
  39 + await createTopic(requestTopic);
  40 + }
  41 +
32 42 let data = JSON.stringify(
33 43 {
34 44 key: scriptId,
... ... @@ -45,6 +55,28 @@ function PubSubProducer() {
45 55 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
46 56 pubSubClient = new PubSub({projectId: projectId, credentials: credentials});
47 57
  58 + parseQueueProperties();
  59 +
  60 + const topicList = await pubSubClient.getTopics();
  61 +
  62 + if (topicList) {
  63 + topicList[0].forEach(topic => {
  64 + topics.push(getName(topic.name));
  65 + });
  66 + }
  67 +
  68 + const subscriptionList = await pubSubClient.getSubscriptions();
  69 +
  70 + if (subscriptionList) {
  71 + topicList[0].forEach(sub => {
  72 + subscriptions.push(getName(sub.name));
  73 + });
  74 + }
  75 +
  76 + if (!(subscriptions.includes(requestTopic) && topics.includes(requestTopic))) {
  77 + await createTopic(requestTopic);
  78 + }
  79 +
48 80 const subscription = pubSubClient.subscription(requestTopic);
49 81
50 82 const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer());
... ... @@ -64,6 +96,36 @@ function PubSubProducer() {
64 96 }
65 97 })();
66 98
  99 +async function createTopic(topic) {
  100 + if (!topics.includes(topic)) {
  101 + await pubSubClient.createTopic(topic);
  102 + topics.push(topic);
  103 + logger.info('Created new Pub/Sub topic: %s', topic);
  104 + }
  105 + await createSubscription(topic)
  106 +}
  107 +
  108 +async function createSubscription(topic) {
  109 + if (!subscriptions.includes(topic)) {
  110 + await pubSubClient.topic(topic).createSubscription(topic);
  111 + subscriptions.push(topic);
  112 + logger.info('Created new Pub/Sub subscription: %s', topic);
  113 + }
  114 +}
  115 +
  116 +function parseQueueProperties() {
  117 + const props = queueProperties.split(';');
  118 + props.forEach(p => {
  119 + const delimiterPosition = p.indexOf(':');
  120 + queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  121 + });
  122 +}
  123 +
  124 +function getName(fullName) {
  125 + const delimiterPosition = fullName.lastIndexOf('/');
  126 + return fullName.substring(delimiterPosition + 1);
  127 +}
  128 +
67 129 process.on('exit', () => {
68 130 exit(0);
69 131 });
... ...
... ... @@ -21,7 +21,17 @@ const config = require('config'),
21 21 logger = require('../config/logger')._logger('rabbitmqTemplate');
22 22
23 23 const requestTopic = config.get('request_topic');
  24 +const host = config.get('rabbitmq.host');
  25 +const port = config.get('rabbitmq.port');
  26 +const vhost = config.get('rabbitmq.virtual_host');
  27 +const username = config.get('rabbitmq.username');
  28 +const password = config.get('rabbitmq.password');
  29 +const queueProperties = config.get('rabbitmq.queue-properties');
  30 +const poolInterval = config.get('js.response_poll_interval');
  31 +
24 32 const amqp = require('amqplib/callback_api');
  33 +
  34 +let queueParams = {durable: false, exclusive: false, autoDelete: false};
25 35 let connection;
26 36 let channel;
27 37 let stopped = false;
... ... @@ -58,10 +68,11 @@ function RabbitMqProducer() {
58 68 (async () => {
59 69 try {
60 70 logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
  71 + const url = `amqp://${host}:${port}${vhost}`;
61 72
62   - amqp.credentials.amqplain('admin', 'password');
  73 + amqp.credentials.amqplain(username, password);
63 74 connection = await new Promise((resolve, reject) => {
64   - amqp.connect('amqp://localhost:5672/', function (err, connection) {
  75 + amqp.connect(url, function (err, connection) {
65 76 if (err) {
66 77 reject(err);
67 78 } else {
... ... @@ -80,6 +91,8 @@ function RabbitMqProducer() {
80 91 });
81 92 });
82 93
  94 + parseQueueProperties();
  95 +
83 96 await createQueue(requestTopic);
84 97
85 98 const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer());
... ... @@ -98,6 +111,8 @@ function RabbitMqProducer() {
98 111 if (message) {
99 112 messageProcessor.onJsInvokeMessage(message.content.toString('utf8'));
100 113 channel.ack(message);
  114 + } else {
  115 + await sleep(poolInterval);
101 116 }
102 117 }
103 118 } catch (e) {
... ... @@ -107,10 +122,17 @@ function RabbitMqProducer() {
107 122 }
108 123 })();
109 124
  125 +function parseQueueProperties() {
  126 + const props = queueProperties.split(';');
  127 + props.forEach(p => {
  128 + const delimiterPosition = p.indexOf(':');
  129 + queueParams[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
  130 + });
  131 +}
  132 +
110 133 function createQueue(topic) {
111   - let params = {durable: false};
112 134 return new Promise((resolve, reject) => {
113   - channel.assertQueue(topic, params, function (err, data) {
  135 + channel.assertQueue(topic, queueParams, function (err, data) {
114 136 if (err) {
115 137 reject(err);
116 138 } else {
... ... @@ -120,6 +142,12 @@ function createQueue(topic) {
120 142 });
121 143 }
122 144
  145 +function sleep(ms) {
  146 + return new Promise((resolve) => {
  147 + setTimeout(resolve, ms);
  148 + });
  149 +}
  150 +
123 151 process.on('exit', () => {
124 152 exit(0);
125 153 });
... ... @@ -146,4 +174,4 @@ async function exit(status) {
146 174 } else {
147 175 process.exit(status);
148 176 }
149   -}
\ No newline at end of file
  177 +}
... ...