Kafka Streams: как получить пределы смещения до подачи заявления на повторную обработку и как его остановить - PullRequest
1 голос
/ 15 марта 2019

Я ищу способ сделать инструмент повторной обработки в KafkaStreams, который позволит повторно обрабатывать данные с самого начала в теме (применяя некоторые фильтры и записывая обновленные версии этих событий в одну и ту же тему). В то же время существует долгосрочное приложение, обрабатывающее данные из этой темы.

Для повторной обработки только до момента времени , когда приложение запускается и останавливается после него, необходимо знать, когда следует остановиться, что является последним смещением, произведенным в этой точке. Например. карта может быть построена до запуска топологии, которая будет иметь (разбиение -> смещение), чтобы знать эти ограничения, поэтому приложение сможет остановиться, когда будет достигнуто это смещение, сравнивая текущий раздел и смещение (через Processor API) с предел смещения на этой исходной карте.

Возможно ли / имеет ли смысл получать информацию о последних смещениях из Kafka Streams? Есть ли другой способ обойти это? (Я полагаю, что вы можете создать его через обычных потребителей Kafka, ища конец и получая позицию, но я спрашиваю, есть ли интегрированное решение в KafkaStreams).

Кроме того, как аккуратно остановить приложение, только когда все разделы достигли своего смещения, зная, что эта информация распространяется, поэтому вам нужно будет знать состояние всех экземпляров?

Kafka / KafkaStreams 2.1, Scala 2.12

1 Ответ

1 голос
/ 17 марта 2019

Использование потребителя для получения конечного смещения кажется разумным.Для остановки приложения вам нужно будет создать ручное решение, которое будет отслеживать прогресс.Например, используя transformValues(), вы можете проверить имя темы, раздел и смещение входной записи (используя объект context, предоставленный методом init()).Это должно позволить вам позвонить KafkaStreams#close(), когда все данные обработаны.

Вас может заинтересовать этот KIP (в активном банкомате), в котором обсуждались похожие идеи: https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams

...