Я пытаюсь создать потребителя Kafka, который может читать из темы Kafka. Чтобы сделать это, я должен задушить потребителя с помощью async.queue. Это позволяет моему приложению работать с одним потребителем и обрабатывать все в своем темпе. Это в настоящее время работает, за исключением крошечной детали. Я должен создать экземпляр для потребителя и производителей одновременно. И потребитель обрабатывает сообщения быстрее, чем я могу их создать (поскольку производитель зависит от стороннего API).
Моя текущая настройка для обработки выглядит следующим образом:
Потребитель:
let KafkaConsumer = new Promise(async function (resolve, reject){
logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `Going through all the trades `)
consumer.on('message', async (message) => {
await q.push(message, function(err){
if (err) { return }
else{
consumer.resume()
}
})
})
resolve()
})
await q.drain()
await KafkaConsumer
}
Очередь:
// Intialize a local worker queue with concurrency as 1 (only 1 event is processed at a time)
var q = async.queue((message, cb) => {
processMessage(message, cb)
}, 1)
q.drain(function() {
console.log('all items have been processed');
// Pause the consumer
consumer.close(function(err){
})
// Apply the backtest stats
testStats()
});
Однако с этой настройкойи, учитывая состояние гонки, очередь истощается слишком быстро (хотя после пары десятков сообщений она стабилизируется), и я не могу произвести достаточно быстро для потребителя, так что я думаю об использовании q.retry для вызова каждый раз, когда очередьсливают. Это позволило бы мне повторить ожидание сообщений и, следовательно, иметь возможность ждать, пока потребитель сам не изменится.
Я не очень уверен, где это будет. вписывается в мою текущую схему, поэтому любая помощь очень ценится.
TIA