Как Kafka Consumer потребляет данные из нескольких назначенных разделов - PullRequest
0 голосов
/ 07 февраля 2019

tl; dr; Я пытаюсь понять, как один потребитель, которому назначено несколько разделов, обрабатывает использование записей для расширенного раздела.

Например:

  • Полностью обрабатывает один раздел перед переходом к следующему.
  • Обрабатывать порцию доступных записей из каждого раздела каждый раз.
  • Обрабатывать пакет из N записей из первых доступных разделов
  • Обрабатывать пакет из N записей из разделов в цикле-ротация вращения

Я нашел конфигурацию partition.assignment.strategy для присваивателей Ranged или RoundRobin, но это только определяет, как потребителям назначаются разделы, а не как потребляет их из разделов, которым он назначен.

Я начал копаться в источнике KafkaConsumer и # poll () привел меня к # pollForFetches () # pollForFetches () , затем привел меня fetcher # fetchedRecords () и fetcher # sendFetches ()

Это просто заставило меня попытаться проследить весь класс Fetcher всевместе и, может быть, уже поздно, или, может быть, я просто не стал разбираться в том, что далеко зашло, но у меня возникают проблемы с определением того, как потребитель будет обрабатывать несколько назначенных разделов.

Справочная информация

Работа над конвейером данных, поддерживаемым Kafka Streams.

На нескольких этапах в этом конвейере, когда записи обрабатываются различными приложениями Kafka Streams, поток присоединяется к сжатым темам, передаваемым из внешних источников данных, которые предоставляютнеобходимые данные, которые будут дополнены в записях, прежде чем переходить к следующему этапу обработки.

По пути есть несколько тем мертвых букв, где записи не могут быть сопоставлены с внешними источниками данных, которые могли бы увеличитьзапись.Это может быть связано с тем, что данные просто еще не доступны (событие или кампания еще не активны) или это неверные данные и никогда не будут совпадать.

Цель состоит в том, чтобы когда-либо переиздавать записи из темы недоставленных сообщенийпубликуются новые дополненные данные, так что мы можем сопоставлять ранее несопоставленные записи из темы мертвых писем, чтобы обновлять их и отправлять их в поток для дополнительной обработки.

Записи могут не совпадать при нескольких попытках и могут иметьнесколько копий в теме недоставленных сообщений, поэтому мы хотим обрабатывать только существующие записи (до последнего смещения при запуске приложения), а также записи, отправленные в тему недоставленных сообщений с момента последнего запуска приложения (после ранее сохраненногосмещения групп потребителей).

Это работает хорошо, так как мой потребитель отфильтровывает любые записи, поступающие после запуска приложения, и мой продюсер управляет смещениями моей группы потребителей, фиксируя смещения как часть публикацииОперация ing.

Но я хочу убедиться, что в конечном итоге я буду использовать все разделы, поскольку столкнулся со странным случаем, когда несвязанные записи обрабатываются и попадают в тот же раздел, что и ранее, в теме недоставленных сообщений.только чтобы быть отфильтрованным потребителем.И хотя он не получает новые партии записей для обработки, есть разделы, которые еще не были обработаны.

Любая помощь в понимании того, как один потребитель обрабатывает несколько назначенных разделов, будет принята с благодарностью.

1 Ответ

0 голосов
/ 07 февраля 2019

Вы были на правильном пути, глядя на Fetcher, поскольку большая часть логики присутствует.

Сначала, как упоминает Consumer Javadoc :

Если потребителю назначено несколько разделов для извлечения данных, он будет пытаться использовать их все одновременно, эффективно предоставляя этим разделам одинаковый приоритет для потребления.

Как вы можете себе представить,на практике есть несколько вещей, которые необходимо учитывать.

  • Каждый раз, когда потребитель пытается получить новые записи, он исключает разделы, для которых у него уже есть ожидающие записи (изпредыдущий выбор).Разделы, в которых уже есть запрос на выборку в полете, также исключаются.

  • При извлечении записей потребитель указывает fetch.max.bytes и max.partition.fetch.bytes в запросе на выборку.Они используются брокерами, чтобы, соответственно, определить, сколько данных вернуть в общей сложности и по разделу.Это в равной степени относится ко всем разделам.

При использовании этих двух подходов Потребитель по умолчанию пытается использовать все разделы справедливо.Если это не так, обычно помогает изменение fetch.max.bytes или max.partition.fetch.bytes.

В случае, если вы хотите расставить приоритеты для одних разделов над другими, вам нужно использовать pause() и resume() для ручного управления потоком потребления.

...