Я использую Kafka-Node
в своем приложении NodeJS для создания и получения сообщений.Я начинаю потребителя, который ждет мою тему.Затем я запускаю продюсера и отправляю сообщения Кафке.Мой потребитель вставляет каждое из этих сообщений в базу данных Postgres.
Для одного потребителя это работает нормально.
Когда я останавливаю потребителя и продолжаю производить, я перезагружаю потребителя о30 секунд спустяЯ заметил, что существует около дюжины или около того сообщений, которые уже были вставлены в БД от исходного потребителя.
Я предполагаю, что когда я убивал потребителя, были смещения, которые еще не были зафиксированы, и поэтомувторой потребитель их подбирает?
Как лучше всего справиться с этой ситуацией?
var kafka = require('kafka-node');
var utilities = require('./utilities');
var topics = ['test-ingest', 'test-ingest2'];
var groupName = 'test';
var options = {
groupId: groupName,
autoCommit: false,
sessionTimeout: 15000,
fetchMaxBytes: 1024 * 1024,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest'
};
var consumerGroup = new kafka.ConsumerGroup(options, topics);
// Print the message
consumerGroup.on('message', function (message) {
// Submit our message into postgres - return a promise
utilities.storeRecord(message).then((dbResult) => {
// Commit the offset
consumerGroup.commit((error, data) => {
if (error) {
console.error(error);
} else {
console.log('Commit success: ', data);
}
});
});
});