Я реализовал простой процессор записи мертвых букв Kafka.
Он отлично работает при использовании записей, созданных производителем консоли.
Однако я считаю, что наши приложения Kafka Streams не гарантируют, чтосоздание записей в разделах приемника, в которых смещения будут увеличиваться на 1 для каждой произведенной записи.
Процессор Dead Letter Background:
У меня есть сценарий, в котором записи могут быть получены до того, как все данные потребуютсяПроцесс опубликован.Когда записи не сопоставляются для обработки приложением потоков, они перемещаются в тему мертвых букв, а не продолжают течь вниз по течению.Когда публикуются новые данные, мы сбрасываем последние сообщения из темы мертвых писем обратно в исходную тему потокового приложения для повторной обработки с новыми данными.
Процессор мертвых писем:
- В начале выполнения приложение записывает конечные смещения каждого раздела
- Конечные смещения помечают точку для остановки обработки записей для заданной темы мертвой буквы, чтобы избежать бесконечного цикла, если обработанные записи возвращаются к теме мертвой буквы.
- Приложение возобновляет работу с последнего смещения, созданного предыдущим прогоном через группы потребителей.
- Приложение использует транзакции и
KafkaProducer#sendOffsetsToTransaction
для фиксации последнего сгенерированного смещения.
Чтобы отслеживать, когда все записи в моем диапазоне обрабатываются для раздела темы, моя служба сравнивает свое последнее произведенное смещение от производителя с сохраненной потребителями картой конечных смещений.Когда мы достигаем конечного смещения, потребитель приостанавливает этот раздел с помощью KafkaConsumer#pause
, а когда все разделы ставятся на паузу (то есть они достигли сохраненного конечного смещения), затем вызывает его.
* API-интерфейс Kafka 1028* Состояния:
Смещения и позиция потребителя Kafka поддерживает числовое смещение для каждой записи в разделе.Это смещение действует как уникальный идентификатор записи в этом разделе, а также обозначает позицию потребителя в разделе.Например, потребитель, находящийся в позиции 5, использовал записи со смещением от 0 до 4 и затем получит запись со смещением 5.
API-источник Kafka ссылается на следующеесмещение также всегда равно +1.
Отправляет список указанных смещений координатору группы потребителей, а также отмечает эти смещения как часть текущей транзакции.Эти смещения будут считаться зафиксированными, только если транзакция зафиксирована успешно.Фиксированное смещение должно быть следующим сообщением, которое ваше приложение будет использовать, т.е. lastProcessedMessageOffset + 1.
Но в моем отладчике вы можете ясно видеть, что записи, используемые для одного раздела, не увеличиваются на 1 привремя ...
Я подумал, может быть, это проблема конфигурации Kafka, такая как max.message.bytes
, но в действительности это не имело смысла.Тогда я подумал, что, возможно, это из-за присоединения, но не увидел никакого способа, который изменит способ работы производителя.
Не уверен, что это актуально или нет, но все наши приложения Kafka используют Avro и SchemaРеестр ...
Должны ли смещения всегда увеличиваться на 1 независимо от способа создания или возможно, что использование API потоков Kafka не дает таких же гарантий, как у обычных клиентов Producer Consumer?
Я просто что-то упускаю?