Commit 9d82899f049b44df2c9a2d51f584d840427c00be

Authored by YevhenBondarenko
Committed by Andrew Shvayka
1 parent 10a3f269

raised kafkajs version and improvements kafkaTemplate.js

... ... @@ -14,7 +14,7 @@
14 14 "dependencies": {
15 15 "config": "^3.2.2",
16 16 "js-yaml": "^3.12.0",
17   - "kafkajs": "^1.12.0",
  17 + "kafkajs": "^1.14.0",
18 18 "@google-cloud/pubsub": "^1.7.1",
19 19 "aws-sdk": "^2.663.0",
20 20 "amqplib": "^0.5.5",
... ...
... ... @@ -27,20 +27,10 @@ let kafkaAdmin;
27 27 let consumer;
28 28 let producer;
29 29
30   -const topics = [];
31 30 const configEntries = [];
32 31
33 32 function KafkaProducer() {
34 33 this.send = async (responseTopic, scriptId, rawResponse, headers) => {
35   -
36   - if (!topics.includes(responseTopic)) {
37   - let createResponseTopicResult = await createTopic(responseTopic, 1);
38   - topics.push(responseTopic);
39   - if (createResponseTopicResult) {
40   - logger.info('Created new topic: %s', requestTopic);
41   - }
42   - }
43   -
44 34 return producer.send(
45 35 {
46 36 topic: responseTopic,
... ... @@ -99,10 +89,13 @@ function KafkaProducer() {
99 89 }
100 90 }
101 91
102   - let createRequestTopicResult = await createTopic(requestTopic, partitions);
  92 + let topics = await kafkaAdmin.listTopics();
103 93
104   - if (createRequestTopicResult) {
105   - logger.info('Created new topic: %s', requestTopic);
  94 + if (!topics.includes(requestTopic)) {
  95 + let createRequestTopicResult = await createTopic(requestTopic, partitions);
  96 + if (createRequestTopicResult) {
  97 + logger.info('Created new topic: %s', requestTopic);
  98 + }
106 99 }
107 100
108 101 consumer = kafkaClient.consumer({groupId: 'js-executor-group'});
... ...