Как мне завершить тему кафки, когда я создаю нового потребителя в существующей группе потребителей с потоками akka? - PullRequest
1 голос
/ 24 июня 2019

У меня есть приложение akka (в JAVA), которое использует commitablePartitionedSource для получения сообщений из тем kafka. У меня есть несколько групп потребителей, которые раскручивают потребителей по нескольким темам. Это обусловлено динамической конфигурацией, в которой я могу временно отключить потребителей и, возможно, запустить их позже.

Когда этот потребитель перезапускается, я хочу только читать новые сообщения, а не начинать с того места, где остановился.

Есть ли способ получить объект kafkaConsumer от потребителя akka-alpakka, чтобы я мог искать ToEnd () перед обработкой? Пожалуйста, дайте мне знать, есть ли другой способ добиться этого? Может быть с конфигурацией Акка или другой тип потребителя? Я предпочитаю не поддерживать свои собственные смещения (надеюсь, не единственный вариант)

Моя конфигурация настроена на получение смещения latest, когда я запускаю группу потребителей, но, поскольку я завершаю работу и перезапускаю отдельных потребителей, она всегда начинает потреблять там, где я остановилась.

Я пытался создать группу потребителей для темы, но у меня много тем, и это оказалось довольно ресурсоемким. Я также искал способ безуспешно очистить смещения, сохраненные в kafka для этой темы.

1 Ответ

1 голос
/ 24 июня 2019

Самый простой способ - создать новую группу потребителей каждый раз, когда вы перезапускаете своего потребителя. Kafka позаботится об удалении устаревших групп потребителей по истечении заданного промежутка времени ( retention.ms ).

Эта стратегия хороша, если вы редко перезапускаете своего потребителя и всегда хотите, чтобы он обрабатывал свежие данные, а не отслеживал все пропущенные сообщения.

EDIT

Насколько я знаю, единственный способ получить доступ к базовому KafkaConsumer - это использовать committableExternalSource. Таким образом, у вас будет доступ к методу seekToEnd, однако вам также необходимо позаботиться о подписке на тему, предоставляющую начальное смещение для раздела (аналогично тому, как вы сейчас настраиваете committablePartitionedSource, но вне Акка ).

...