Узел Кафка - повторяющиеся сообщения приходят - PullRequest
0 голосов
/ 22 февраля 2019

Я получаю дубликаты сообщений при использовании сообщений Kafka с помощью группы потребителей.

Я использую эту библиотеку Nodejs.https://www.npmjs.com/package/kafka-node

Мой потребительский код указан ниже

const config = require( '../../configs' );
const kafka = require( 'kafka-node' );

var options = {
    id: 'consumer1',
    kafkaHost: config.kafka.prod.kafka_host, //multiple kafka hosts (comma separated)
    groupId: "test-group2",
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest'
};

var consumerGroup = new kafka.ConsumerGroup( options, 'my-replicated-topic3' );

consumerGroup.on( 'message', function ( message ) {
    console.log( message );
} );

Я получаю результат ниже.

{ topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8966,
  key: null }
  ---
  ---
  ---
  ---
  { topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8970,
  key: null }

Вы видите, что одно и то же сообщение повторяется после каждогонесколько записей.Здесь смещение сообщения такое же, но highWaterOffset отличается от всех повторяющихся сообщений.

Пожалуйста, предложите способ исправить это.

1 Ответ

0 голосов
/ 22 февраля 2019

Реанимируете ли вы своего потребителя?Как ваш потребитель фиксирует свои смещения?

По умолчанию ваш потребитель автоматически фиксирует свое смещение каждые 5 секунд.Так обстоит дело с вашей библиотекой.

Если вы повторно создадите экземпляр своего потребителя до фиксации смещения, он будет перезапущен с последнего зафиксированного смещения.

...