Первый опрос потребителей Kafka не возвращает сообщения topi c. Что может быть не так? - PullRequest
0 голосов
/ 26 марта 2020

У меня есть простая установка Kafka 2.4.1 (Confluent 5.4.1), работающая локально в Docker. И я использую тестового производителя и тестового потребителя, написанного в Java. Код доступен в GitHub .

Модульные тесты:

  • производитель выдает одно сообщение для top-single-section-topi c
  • потребитель подписывается на topi c и опрашивает Kafka на наличие доступных сообщений

Проблема в том, что при первом запуске потребителя будут пропущены уже созданные сообщения, доступные в topi c. Проблема real в том, что эти пропущенные сообщения теряются (с точки зрения потребителя: смещение перемещается в самое последнее в topi c, а отставание равно 0 <- все это видно в <a href="http://www.kafkatool.com/" rel="nofollow noreferrer"> Kafka Tool )

Результаты после первого запуска:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

2-й запуск тестов дает:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

Я пытался с разными вариациями и всегда результат один и тот же:

  • дождитесь, пока Кафка "прогреется", прежде чем выдать первое сообщение
  • подождите некоторое время между производством и потреблением
  • выдает несколько сообщений до первого потребления
  • , произведенных и потребленных без проверки с помощью Kafka Tool (во избежание неизвестного вмешательства третьих лиц)

Иногда я также заметил, что Второй прогон потребителя все же пропустил произведенные события.

1 Ответ

1 голос
/ 26 марта 2020

Глядя на код в вашем репозитории GitHub, кажется, что вы не устанавливаете конфигурацию Consumer auto.offset.reset. Согласно документации этот параметр по умолчанию равен latest. Это означает, что если Consumer Group не известна брокеру для вашего теста topi c, он будет только ждать новых входящих сообщений. Поэтому сообщения, написанные вашим тестом продюсера заранее , не могут быть использованы TestConsumer.

Эта документация дает хорошее объяснение концепции группы потребителей в Кафка.

...