Как прочитать сообщение одно за другим в node-rdkafka - PullRequest
1 голос
/ 27 января 2020

Я использую node-rdkafka (https://github.com/Blizzard/node-rdkafka) для приема сообщений, настройка basi c работает нормально, но она вызывает функцию каждый раз, когда я помещаю что-то в очередь sh , независимо от завершения предыдущего метода.

Я хочу, чтобы следующий блок данных был запущен после выполнения предыдущей функции.

здесь моя реализация

const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
    'group.id':'consumer',
    'metadata.broker.list': '*******',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '********',
    'sasl.password': '********',
    'security.protocol': 'SASL_SSL',
    'enable.auto.commit':false
}, {});

// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
    if (err) {
        console.log(`Error connecting to Kafka broker: ${err}`);
        process.exit(-1);
    }

});
let is_pause = false;
consumer.on('ready', (arg)=>{
    console.log('consumer ready.' + JSON.stringify(arg));
    console.log('Consumer is ready');
    consumer.subscribe([topic]);
    setInterval(function() {
        console.log('consumer has consume on :'+timeMs());  
        consumer.consume();
      }, 1000);
});

consumer.on('data',async (data)=>{
    console.log('consumer is consuming data');
    if(!is_pause) {
        is_pause = true;
        if(data && typeof data !== 'undefined') {
            try {
                console.log('consumer received the data');
                consumer.pause([topic]);
                console.log('consumer has pause the consuming');
                await processMessage(data);
                console.log('consumer is resumed');
                consumer.resume([topic]);
                is_pause = false;
            } catch(error) {
                console.log('data consuming error');
                console.log(error);
            }
        } else {
            is_pause = false;
        }
    }
});


1 Ответ

0 голосов
/ 27 января 2020

Вы звоните consume() (без каких-либо аргументов), который возвращает сообщения как можно быстрее.

Если вы хотите контролировать темп потребления, вы можете использовать другой метод consume(size), который возвращает size Кафка записей. Например, consume(1) вернет следующую запись Кафки.

См. Документацию для пользователей node-rdkafka .

...