tl; dr; Я пытаюсь понять, как один потребитель, которому назначено несколько разделов, обрабатывает использование записей для расширенного раздела.
Например:
- Полностью обрабатывает один раздел перед переходом к следующему.
- Обрабатывать порцию доступных записей из каждого раздела каждый раз.
- Обрабатывать пакет из N записей из первых доступных разделов
- Обрабатывать пакет из N записей из разделов в цикле-ротация вращения
Я нашел конфигурацию partition.assignment.strategy
для присваивателей Ranged
или RoundRobin
, но это только определяет, как потребителям назначаются разделы, а не как потребляет их из разделов, которым он назначен.
Я начал копаться в источнике KafkaConsumer и # poll () привел меня к # pollForFetches () # pollForFetches () , затем привел меня fetcher # fetchedRecords () и fetcher # sendFetches ()
Это просто заставило меня попытаться проследить весь класс Fetcher всевместе и, может быть, уже поздно, или, может быть, я просто не стал разбираться в том, что далеко зашло, но у меня возникают проблемы с определением того, как потребитель будет обрабатывать несколько назначенных разделов.
Справочная информация
Работа над конвейером данных, поддерживаемым Kafka Streams.
На нескольких этапах в этом конвейере, когда записи обрабатываются различными приложениями Kafka Streams, поток присоединяется к сжатым темам, передаваемым из внешних источников данных, которые предоставляютнеобходимые данные, которые будут дополнены в записях, прежде чем переходить к следующему этапу обработки.
По пути есть несколько тем мертвых букв, где записи не могут быть сопоставлены с внешними источниками данных, которые могли бы увеличитьзапись.Это может быть связано с тем, что данные просто еще не доступны (событие или кампания еще не активны) или это неверные данные и никогда не будут совпадать.
Цель состоит в том, чтобы когда-либо переиздавать записи из темы недоставленных сообщенийпубликуются новые дополненные данные, так что мы можем сопоставлять ранее несопоставленные записи из темы мертвых писем, чтобы обновлять их и отправлять их в поток для дополнительной обработки.
Записи могут не совпадать при нескольких попытках и могут иметьнесколько копий в теме недоставленных сообщений, поэтому мы хотим обрабатывать только существующие записи (до последнего смещения при запуске приложения), а также записи, отправленные в тему недоставленных сообщений с момента последнего запуска приложения (после ранее сохраненногосмещения групп потребителей).
Это работает хорошо, так как мой потребитель отфильтровывает любые записи, поступающие после запуска приложения, и мой продюсер управляет смещениями моей группы потребителей, фиксируя смещения как часть публикацииОперация ing.
Но я хочу убедиться, что в конечном итоге я буду использовать все разделы, поскольку столкнулся со странным случаем, когда несвязанные записи обрабатываются и попадают в тот же раздел, что и ранее, в теме недоставленных сообщений.только чтобы быть отфильтрованным потребителем.И хотя он не получает новые партии записей для обработки, есть разделы, которые еще не были обработаны.
Любая помощь в понимании того, как один потребитель обрабатывает несколько назначенных разделов, будет принята с благодарностью.