Есть ли способ начать потребление тем кафки из заданного смещения c в Java API? - PullRequest
2 голосов
/ 02 мая 2020

Я использую Kafka Stream API. Когда я запускаю свое приложение, иногда возникает разрыв, и я хочу начать потреблять с заданного смещения c. Самое раннее или самое позднее - это не то, что мне нужно.

streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Я ищу сценарий, в котором я задаю номер смещения или дату в миллисекундах в файле конфигурации и начинаю потреблять с этого момента. Интересно, есть ли способ добиться этого?

1 Ответ

2 голосов
/ 03 мая 2020

Конфигурация auto.offset.reset действует только при первом запуске приложения, когда смещение еще не зафиксировано. Если смещения фиксируются, приложение всегда возобновляет обработку с принятых смещений.

В Kafka Streams отсутствует API для явной установки начальных смещений. Потребительский API разрешил бы это через Consumer#seek().

. Для Kafka Streams один из способов получить желаемое поведение - остановить приложение, использовать bin/kafka-consmer.group.sh (или, может быть, лучше bin/kafka-streams-application-reset.sh), и зафиксировать желаемое начальное смещение. Если впоследствии вы запустите приложение, оно получит зафиксированное смещение и начнет обработку оттуда.

...