У меня есть приложение akka (в JAVA), которое использует commitablePartitionedSource
для получения сообщений из тем kafka. У меня есть несколько групп потребителей, которые раскручивают потребителей по нескольким темам. Это обусловлено динамической конфигурацией, в которой я могу временно отключить потребителей и, возможно, запустить их позже.
Когда этот потребитель перезапускается, я хочу только читать новые сообщения, а не начинать с того места, где остановился.
Есть ли способ получить объект kafkaConsumer от потребителя akka-alpakka, чтобы я мог искать ToEnd () перед обработкой?
Пожалуйста, дайте мне знать, есть ли другой способ добиться этого? Может быть с конфигурацией Акка или другой тип потребителя? Я предпочитаю не поддерживать свои собственные смещения (надеюсь, не единственный вариант)
Моя конфигурация настроена на получение смещения latest
, когда я запускаю группу потребителей, но, поскольку я завершаю работу и перезапускаю отдельных потребителей, она всегда начинает потреблять там, где я остановилась.
Я пытался создать группу потребителей для темы, но у меня много тем, и это оказалось довольно ресурсоемким. Я также искал способ безуспешно очистить смещения, сохраненные в kafka для этой темы.