NodeJS Kafka Consumer получает дубликаты сообщений? - PullRequest
1 голос
/ 10 июля 2019

Я использую Kafka-Node в своем приложении NodeJS для создания и получения сообщений.Я начинаю потребителя, который ждет мою тему.Затем я запускаю продюсера и отправляю сообщения Кафке.Мой потребитель вставляет каждое из этих сообщений в базу данных Postgres.

Для одного потребителя это работает нормально.

Когда я останавливаю потребителя и продолжаю производить, я перезагружаю потребителя о30 секунд спустяЯ заметил, что существует около дюжины или около того сообщений, которые уже были вставлены в БД от исходного потребителя.

Я предполагаю, что когда я убивал потребителя, были смещения, которые еще не были зафиксированы, и поэтомувторой потребитель их подбирает?

Как лучше всего справиться с этой ситуацией?

var kafka = require('kafka-node');
var utilities = require('./utilities');
var topics = ['test-ingest', 'test-ingest2'];
var groupName = 'test';

var options = {
    groupId: groupName,
    autoCommit: false,
    sessionTimeout: 15000,
    fetchMaxBytes: 1024 * 1024,
    protocol: ['roundrobin'],
    fromOffset: 'latest',
    outOfRangeOffset: 'earliest'
};

var consumerGroup = new kafka.ConsumerGroup(options, topics);

// Print the message
consumerGroup.on('message', function (message) {

    // Submit our message into postgres - return a promise
    utilities.storeRecord(message).then((dbResult) => {

        // Commit the offset
        consumerGroup.commit((error, data) => {
            if (error) {
                console.error(error);
            } else {
                console.log('Commit success: ', data);
            }
        });

    });

});

1 Ответ

0 голосов
/ 10 июля 2019

Я не могу сказать, почему fromOffset: 'latest' не работает для вас. Простой обходной путь - использовать offset.fetchLatestOffsets, чтобы получить последнее смещение и затем использовать его с этой точки и далее.

...