позвольте мне описать обоснование моего вопроса:
- У нас есть приложение на базе Micronaut, принимающее сообщения от брокера Kafka.
- Использованные сообщения обрабатываются и передаются на другой пульт.«нисходящее» приложение.
- Если это нисходящее приложение намеренно перезапустится, потребуется некоторое время, чтобы подготовиться к принятию дальнейших сообщений от нашего приложения на базе Micronaut.
Итак, мыесть идея отправить в приложение Micronaut запрос на SUSPEND / PAUSE потребление сообщений от Kafka (например, через HTTP на соответствующую конечную точку).Интерфейс KafkaConsumer, похоже, имеет соответствующие методы для достижения этой цели, например
public void pause(java.util.Collection<TopicPartition> partitions)
public void resume(java.util.Collection<TopicPartition> partitions)
Но как получить ссылку на соответствующий экземпляр KafkaConsumer, поданный в нашу конечную точку HTTP?
Мы пыталисьчтобы вставить его в конструктор класса конечной точки / контроллера HTTP, но это дает
Error instantiating bean of type [HttpController]
Message: Missing bean arguments for type: org.apache.kafka.clients.consumer.KafkaConsumer. Requires arguments: AbstractKafkaConsumerConfiguration consumerConfiguration
Можно получить ссылку на экземпляр KafkaConsumer в качестве параметра метода с аннотированными методами получения @Topic, как описано в Документация Micronaut Kafka , но это приведет к сохранению этой ссылки в качестве переменной экземпляра, получению доступа к ней через конечную точку HTTP и т. П. ... что звучит не очень убедительно: вы получаете ссылку ТОЛЬКО на KafkaConsumerпри получении следующего сообщения!Это может подходить для SUSPENDING / PAUSING, но не для RESUMING!
Кстати, вызов KafkaConsumer.resume (...) для ссылки, сохраненной как переменная экземпляра, дает
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2201)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2185)
at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1842)
[...]
Я думаю, что то же самое справедливо при реализации интерфейса KafkaConsumerAware для хранения ссылки на только что созданный экземпляр KafkaConsumer.
Так есть ли идеи, как справиться с этим надлежащим образом?
Спасибо
Кристиан