Я ищу способ сделать инструмент повторной обработки в KafkaStreams, который позволит повторно обрабатывать данные с самого начала в теме (применяя некоторые фильтры и записывая обновленные версии этих событий в одну и ту же тему). В то же время существует долгосрочное приложение, обрабатывающее данные из этой темы.
Для повторной обработки только до момента времени , когда приложение запускается и останавливается после него, необходимо знать, когда следует остановиться, что является последним смещением, произведенным в этой точке. Например. карта может быть построена до запуска топологии, которая будет иметь (разбиение -> смещение), чтобы знать эти ограничения, поэтому приложение сможет остановиться, когда будет достигнуто это смещение, сравнивая текущий раздел и смещение (через Processor API) с предел смещения на этой исходной карте.
Возможно ли / имеет ли смысл получать информацию о последних смещениях из Kafka Streams? Есть ли другой способ обойти это?
(Я полагаю, что вы можете создать его через обычных потребителей Kafka, ища конец и получая позицию, но я спрашиваю, есть ли интегрированное решение в KafkaStreams).
Кроме того, как аккуратно остановить приложение, только когда все разделы достигли своего смещения, зная, что эта информация распространяется, поэтому вам нужно будет знать состояние всех экземпляров?
Kafka / KafkaStreams 2.1, Scala 2.12