Kafka Streams не увеличивает смещение на 1 при создании темы - PullRequest
0 голосов
/ 11 февраля 2019

Я реализовал простой процессор записи мертвых букв Kafka.

Он отлично работает при использовании записей, созданных производителем консоли.

Однако я считаю, что наши приложения Kafka Streams не гарантируют, чтосоздание записей в разделах приемника, в которых смещения будут увеличиваться на 1 для каждой произведенной записи.

Процессор Dead Letter Background:

У меня есть сценарий, в котором записи могут быть получены до того, как все данные потребуютсяПроцесс опубликован.Когда записи не сопоставляются для обработки приложением потоков, они перемещаются в тему мертвых букв, а не продолжают течь вниз по течению.Когда публикуются новые данные, мы сбрасываем последние сообщения из темы мертвых писем обратно в исходную тему потокового приложения для повторной обработки с новыми данными.

Процессор мертвых писем:

  • В начале выполнения приложение записывает конечные смещения каждого раздела
  • Конечные смещения помечают точку для остановки обработки записей для заданной темы мертвой буквы, чтобы избежать бесконечного цикла, если обработанные записи возвращаются к теме мертвой буквы.
  • Приложение возобновляет работу с последнего смещения, созданного предыдущим прогоном через группы потребителей.
  • Приложение использует транзакции и KafkaProducer#sendOffsetsToTransaction для фиксации последнего сгенерированного смещения.

Чтобы отслеживать, когда все записи в моем диапазоне обрабатываются для раздела темы, моя служба сравнивает свое последнее произведенное смещение от производителя с сохраненной потребителями картой конечных смещений.Когда мы достигаем конечного смещения, потребитель приостанавливает этот раздел с помощью KafkaConsumer#pause, а когда все разделы ставятся на паузу (то есть они достигли сохраненного конечного смещения), затем вызывает его.

* API-интерфейс Kafka 1028* Состояния:

Смещения и позиция потребителя Kafka поддерживает числовое смещение для каждой записи в разделе.Это смещение действует как уникальный идентификатор записи в этом разделе, а также обозначает позицию потребителя в разделе.Например, потребитель, находящийся в позиции 5, использовал записи со смещением от 0 до 4 и затем получит запись со смещением 5.

API-источник Kafka ссылается на следующеесмещение также всегда равно +1.

Отправляет список указанных смещений координатору группы потребителей, а также отмечает эти смещения как часть текущей транзакции.Эти смещения будут считаться зафиксированными, только если транзакция зафиксирована успешно.Фиксированное смещение должно быть следующим сообщением, которое ваше приложение будет использовать, т.е. lastProcessedMessageOffset + 1.

Но в моем отладчике вы можете ясно видеть, что записи, используемые для одного раздела, не увеличиваются на 1 привремя ... enter image description here

Я подумал, может быть, это проблема конфигурации Kafka, такая как max.message.bytes, но в действительности это не имело смысла.Тогда я подумал, что, возможно, это из-за присоединения, но не увидел никакого способа, который изменит способ работы производителя.

Не уверен, что это актуально или нет, но все наши приложения Kafka используют Avro и SchemaРеестр ...

Должны ли смещения всегда увеличиваться на 1 независимо от способа создания или возможно, что использование API потоков Kafka не дает таких же гарантий, как у обычных клиентов Producer Consumer?

Я просто что-то упускаю?

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Я знаю, что знание смещения сообщений может быть полезным.Однако Кафка гарантирует только то, что смещение сообщения-X будет больше, чем смещение последнего сообщения (X-1).Кстати, идеальное решение не должно основываться на расчетах смещения.

Под капотом производитель кафки может попытаться отправить сообщения повторно.Кроме того, если брокер выходит из строя, может произойти перебалансировка.Ровно семантика может добавить дополнительное сообщение.Поэтому смещение вашего сообщения может измениться, если произойдет какое-либо из указанных выше событий.

Kafka может добавить дополнительные сообщения для внутреннего использования в тему.Но потребительский API Kafka может отбрасывать эти внутренние сообщения.Следовательно, вы можете видеть только свои сообщения, и смещения сообщений могут не обязательно увеличиваться на 1.

0 голосов
/ 11 февраля 2019

Официальный контракт API не предусматривает, что смещения сообщений увеличиваются на единицу, даже если JavaDocs указывают это (кажется, что JavaDocs должны быть обновлены).

  • Если вы неЕсли вы не используете транзакции, вы получаете либо семантику по крайней мере один раз, либо никаких гарантий (некоторые называют эту семантику по крайней мере один раз).По крайней мере, один раз записи могут быть записаны дважды, и поэтому смещения для двух последовательных сообщений на самом деле не увеличиваются на единицу, поскольку дублирующая запись «потребляет» два смещения.

  • Если выиспользовать транзакции, каждая фиксация (или прерывание) транзакции записывает маркер фиксации (или отмены) в тему - эти транзакционные маркеры также «потребляют» одно смещение (это то, что вы наблюдаете).

Таким образом, в общем случае вы не должны полагаться на последовательные смещения.Единственная гарантия, которую вы получаете, заключается в том, что каждое смещение уникально в пределах раздела.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...