Commit 0d7adb73eb995fd2434325f3549ea224c8a26789
1 parent
8f53d4a2
Improve js executor kafka consumer to exit application in case of not retryable error.
Showing
1 changed file
with
10 additions
and
0 deletions
@@ -190,6 +190,16 @@ async function sendMessagesAsBatch(isImmediately) { | @@ -190,6 +190,16 @@ async function sendMessagesAsBatch(isImmediately) { | ||
190 | removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`)); | 190 | removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`)); |
191 | */ | 191 | */ |
192 | 192 | ||
193 | + const { CRASH } = consumer.events; | ||
194 | + | ||
195 | + consumer.on(CRASH, e => { | ||
196 | + logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); | ||
197 | + if (!e.payload.restart) { | ||
198 | + logger.error('Going to exit due to not retryable error!'); | ||
199 | + exit(-1); | ||
200 | + } | ||
201 | + }); | ||
202 | + | ||
193 | const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); | 203 | const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); |
194 | await consumer.connect(); | 204 | await consumer.connect(); |
195 | await producer.connect(); | 205 | await producer.connect(); |