librdkafka C API Kafka Consumer не правильно читает все сообщения - PullRequest
0 голосов
/ 31 января 2019

Я использую librdkafka C API потребителя (в частности, rd_kafka_consumer_poll для чтения, и я звонил rd_kafka_poll_set_consumer до этого)

Проблема, которую я вижу, заключается в том, что в моем тесте Google я выполняю следующие действия

  1. написать 3 сообщения для kafka

  2. init / start потребитель kafka (rd_kafka_consumer_poll)

  3. inrebalance_cb Я установил смещение каждого раздела на RD_KAFKA_OFFSET_STORED и назначил их для обработки

  4. На данный момент я считаю, что он должен прочитать 3 сообщения, но он читает только последнее сообщение, но на удивление смещение для каждогораздел уже обновлен!

Я что-то упускаю при использовании Kafka?

И еще один вопрос: я изначально думал, что сохраненное смещение находится в брокере Kafka и существует уникальныйсмещение для темы + идентификатор группы потребителей + комбинация разделов.

Так что я думал, что разные группы потребителей, читающие одну и ту же тему, должны иметь разное смещение.

Однако это не похоже на случай.Я всегда читаю с одного и того же смещения, когда используются разные группы потребителей.

Я подозреваю, что это может быть связано со смещенным коммитом, но не знаю, где с этим справиться.

Есть идеи?

1 Ответ

0 голосов
/ 31 января 2019

Конфигурация для просмотра: auto.offset.reset

От Документация потребителя Kakfa :

Что делать, если в Kafka нет начального смещения или еслитекущее смещение больше не существует на сервере

Из документации librdkafka :

Действие, которое необходимо выполнить, если в смещении нет начального смещенияstore или желаемое смещение выходит за пределы диапазона: «наименьшее», «самое раннее» - автоматически сбрасывает смещение на наименьшее смещение, «наибольшее», «последнее» - автоматически сбрасывает смещение на наибольшее смещение, «ошибка» - вызываетошибка, которая получается путем использования сообщений и проверки 'message-> err'.Тип: enum value

Значение по умолчанию: последний .

Кроме того,

#define RD_KAFKA_OFFSET_STORED -1000

Итак, вы пытаетесь установить разделсмещение -1000, которое явно не является допустимым смещением.Судя по всему, librdkafka читает последнее сообщение в этом случае (я не проверял код).

...