При попытке получить и обработать сообщения из темы Кафки все мои сообщения расходуются в ошибочном порядке, и в конечном итоге перегрузка и сбой Кафки.
Я попытался добавить в порядок pause () и resume ()контролировать пропускную способность. Кажется, что это не режет.
consumer = new Consumer(client,[{ topic: process.env.TOPIC , partition: 0 }]);
logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `Going through all the trades `)
let KafkaConsumer = new Promise(async function (resolve, reject){
consumer.on('message', async function (message) {
consumer.pause()
await timeout(0).then(async()=>{
// Parse the value consumed from kafka
parsedPrice = JSON.parse(message.value)
console.log(parsedPrice.timestamp)
// Populate the Arrays with the appropriate data
open.push(parsedPrice.open)
close.push(parsedPrice.close)
high.push(parsedPrice.high)
low.push(parsedPrice.low)
// Strategy call saved into an object
StratObject = await strategy(open,close,high,low)
//if object.execute = true execute the given trade.
if (StratObject.execute == true){
await testOrder(StratObject)
}})
.then(async()=>{
if(message.offset == (message.highWaterOffset-1)){
logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `No more trades to analyse`)
// Resolve
resolve(message)
// Apply the backtest stats
// testStats()
}
else{
consumer.resume()
}
})
})
})
await KafkaConsumer
Этот код приводит к аварийному завершению работы kafka brker с кодом:
nhandledPromiseRejectionWarning: BrokerNotAvailableError: Broker not available: Broker socket is closed
at new BrokerNotAvailableError (/usr/src/bots/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
Я ожидаю, что он решит каким-то образом поставить в очередь все полученные сообщения и обработать их один за другим независимо от скоростина котором я получаю сообщения