Чтение асинхронного чтения. из . Кафка - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь создать потребителя Kafka, который может читать из темы Kafka. Чтобы сделать это, я должен задушить потребителя с помощью async.queue. Это позволяет моему приложению работать с одним потребителем и обрабатывать все в своем темпе. Это в настоящее время работает, за исключением крошечной детали. Я должен создать экземпляр для потребителя и производителей одновременно. И потребитель обрабатывает сообщения быстрее, чем я могу их создать (поскольку производитель зависит от стороннего API).

Моя текущая настройка для обработки выглядит следующим образом:

Потребитель:

  let KafkaConsumer =  new Promise(async function (resolve, reject){
    logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `Going through all the trades `)
    consumer.on('message', async (message) => {
      await q.push(message, function(err){
        if (err) { return }      
        else{
          consumer.resume()
        }
      })
    })
    resolve()
  })
  await q.drain()
  await KafkaConsumer
}

Очередь:

// Intialize a local worker queue with concurrency as 1 (only 1 event is processed at a time)
var q = async.queue((message, cb) => {
  processMessage(message, cb)
}, 1)



q.drain(function() {

  console.log('all items have been processed');

  // Pause the consumer
  consumer.close(function(err){
  })

  // Apply the backtest stats
  testStats()
});

Однако с этой настройкойи, учитывая состояние гонки, очередь истощается слишком быстро (хотя после пары десятков сообщений она стабилизируется), и я не могу произвести достаточно быстро для потребителя, так что я думаю об использовании q.retry для вызова каждый раз, когда очередьсливают. Это позволило бы мне повторить ожидание сообщений и, следовательно, иметь возможность ждать, пока потребитель сам не изменится.

Я не очень уверен, где это будет. вписывается в мою текущую схему, поэтому любая помощь очень ценится.

TIA

...