Дроссельная кафка Запросы - PullRequest
0 голосов
/ 20 октября 2019

При попытке получить и обработать сообщения из темы Кафки все мои сообщения расходуются в ошибочном порядке, и в конечном итоге перегрузка и сбой Кафки.

Я попытался добавить в порядок pause () и resume ()контролировать пропускную способность. Кажется, что это не режет.

consumer = new Consumer(client,[{ topic: process.env.TOPIC , partition: 0 }]);
    logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `Going through all the trades `)

      let KafkaConsumer =  new Promise(async function (resolve, reject){
        consumer.on('message', async function (message) {
          consumer.pause()
          await timeout(0).then(async()=>{
            // Parse the value consumed from kafka 
            parsedPrice = JSON.parse(message.value)
            console.log(parsedPrice.timestamp)

            // Populate the Arrays with the appropriate data
            open.push(parsedPrice.open)
            close.push(parsedPrice.close)
            high.push(parsedPrice.high)
            low.push(parsedPrice.low)

            // Strategy call saved into an object
            StratObject = await strategy(open,close,high,low)


            //if object.execute = true execute the given trade.
            if (StratObject.execute == true){
              await testOrder(StratObject)
            }})
            .then(async()=>{
              if(message.offset == (message.highWaterOffset-1)){
                logEvent(LOG_LEVELS.info, RESPONSE_CODES.LOG_MESSAGE_ONLY, `No more trades to analyse`)

                // Resolve
                resolve(message)

                // Apply the backtest stats
                // testStats()
              }
              else{
                consumer.resume()
              }
          })
        })
      })
    await KafkaConsumer

Этот код приводит к аварийному завершению работы kafka brker с кодом:

nhandledPromiseRejectionWarning: BrokerNotAvailableError: Broker not available: Broker socket is closed
    at new BrokerNotAvailableError (/usr/src/bots/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)

Я ожидаю, что он решит каким-то образом поставить в очередь все полученные сообщения и обработать их один за другим независимо от скоростина котором я получаю сообщения

...