Commit a4e28ad94533610c4de0f855bad75dcd4460ac74

Authored by Sergey Matvienko
1 parent 2cba1e2f

js-executor fixed promises for each message for Kafka batches

@@ -180,19 +180,17 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT @@ -180,19 +180,17 @@ JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseT
180 var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); 180 var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse);
181 var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); 181 var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8');
182 logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); 182 logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId);
183 - this.producer.send(responseTopic, scriptId, rawResponse, headers);  
184 -//TODO put error msg for other queues implementation except Kafka  
185 -// .then(  
186 -// () => {  
187 -// logger.info('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);  
188 -// },  
189 -// (err) => {  
190 -// if (err) {  
191 -// logger.error('[%s] Failed to send response to queue: %s', requestId, err.message);  
192 -// logger.error(err.stack);  
193 -// }  
194 -// }  
195 -// ); 183 + this.producer.send(responseTopic, scriptId, rawResponse, headers).then(
  184 + () => {
  185 + logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId);
  186 + },
  187 + (err) => {
  188 + if (err) {
  189 + logger.error('[%s] Failed to send response to queue: %s', requestId, err.message);
  190 + logger.error(err.stack);
  191 + }
  192 + }
  193 + );
196 } 194 }
197 195
198 JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) { 196 JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) {
@@ -36,11 +36,12 @@ let producer; @@ -36,11 +36,12 @@ let producer;
36 const configEntries = []; 36 const configEntries = [];
37 37
38 let batchMessages = []; 38 let batchMessages = [];
  39 +let batchResolvers = [];
39 let sendLoopInstance; 40 let sendLoopInstance;
40 41
41 function KafkaProducer() { 42 function KafkaProducer() {
42 this.send = (responseTopic, scriptId, rawResponse, headers) => { 43 this.send = (responseTopic, scriptId, rawResponse, headers) => {
43 - logger.debug('Pending queue response, scriptId: [%s]', scriptId); 44 + logger.debug('Pending queue response, scriptId: [%s]', scriptId);
44 const message = { 45 const message = {
45 topic: responseTopic, 46 topic: responseTopic,
46 messages: [{ 47 messages: [{
@@ -50,17 +51,22 @@ function KafkaProducer() { @@ -50,17 +51,22 @@ function KafkaProducer() {
50 }] 51 }]
51 }; 52 };
52 53
53 - pushMessageToSendLater(message);  
54 - return {}; 54 + return pushMessageToSendLater(message);
55 } 55 }
56 } 56 }
57 57
58 function pushMessageToSendLater(message) { 58 function pushMessageToSendLater(message) {
  59 + let resolver;
  60 + const promise = new Promise((resolve, reject) => {
  61 + resolver = resolve;
  62 + });
59 batchMessages.push(message); 63 batchMessages.push(message);
  64 + batchResolvers.push(resolver);
60 if (batchMessages.length >= maxBatchSize) { 65 if (batchMessages.length >= maxBatchSize) {
61 sendMessagesAsBatch(); 66 sendMessagesAsBatch();
62 sendLoopWithLinger(); //reset loop function and reschedule new linger 67 sendLoopWithLinger(); //reset loop function and reschedule new linger
63 } 68 }
  69 + return promise;
64 } 70 }
65 71
66 function sendLoopWithLinger() { 72 function sendLoopWithLinger() {
@@ -77,22 +83,27 @@ function sendMessagesAsBatch() { @@ -77,22 +83,27 @@ function sendMessagesAsBatch() {
77 if (batchMessages.length > 0) { 83 if (batchMessages.length > 0) {
78 logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length); 84 logger.debug('sendMessagesAsBatch, length: [%s]', batchMessages.length);
79 const messagesToSend = batchMessages; 85 const messagesToSend = batchMessages;
  86 + const resolvers = batchResolvers;
80 batchMessages = []; 87 batchMessages = [];
  88 + batchResolvers = [];
81 producer.sendBatch({ 89 producer.sendBatch({
82 topicMessages: messagesToSend, 90 topicMessages: messagesToSend,
83 acks: acks, 91 acks: acks,
84 compression: compressionType 92 compression: compressionType
85 }).then( 93 }).then(
86 - () => {  
87 - logger.debug('Response sent to kafka, length: [%s]', messagesToSend.length);  
88 - },  
89 - (err) => {  
90 - logger.error('Failed to send kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);  
91 - batchMessages = messagesToSend.concat(batchMessages);  
92 - logger.error(err.stack); 94 + () => {
  95 + logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length);
  96 + for (let i = 0; i < promisesToSend.length; i++) {
  97 + resolvers[i]();
93 } 98 }
  99 + },
  100 + (err) => {
  101 + logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length);
  102 + logger.error(err.stack);
  103 + batchMessages = messagesToSend.concat(batchMessages);
  104 + batchResolvers = resolvers.concat(batchResolvers); //promises will never be rejected. Will retry forever
  105 + }
94 ); 106 );
95 -  
96 } 107 }
97 } 108 }
98 109