Как реализовать конечную точку HTTP в Micronaut, чтобы приостановить потребление сообщений от Kafka? - PullRequest
0 голосов
/ 28 февраля 2019

позвольте мне описать обоснование моего вопроса:

  • У нас есть приложение на базе Micronaut, принимающее сообщения от брокера Kafka.
  • Использованные сообщения обрабатываются и передаются на другой пульт.«нисходящее» приложение.
  • Если это нисходящее приложение намеренно перезапустится, потребуется некоторое время, чтобы подготовиться к принятию дальнейших сообщений от нашего приложения на базе Micronaut.

Итак, мыесть идея отправить в приложение Micronaut запрос на SUSPEND / PAUSE потребление сообщений от Kafka (например, через HTTP на соответствующую конечную точку).Интерфейс KafkaConsumer, похоже, имеет соответствующие методы для достижения этой цели, например

public void pause​(java.util.Collection<TopicPartition> partitions)
public void resume​(java.util.Collection<TopicPartition> partitions)

Но как получить ссылку на соответствующий экземпляр KafkaConsumer, поданный в нашу конечную точку HTTP?

Мы пыталисьчтобы вставить его в конструктор класса конечной точки / контроллера HTTP, но это дает

Error instantiating bean of type  [HttpController]
Message: Missing bean arguments for type: org.apache.kafka.clients.consumer.KafkaConsumer. Requires arguments: AbstractKafkaConsumerConfiguration consumerConfiguration

Можно получить ссылку на экземпляр KafkaConsumer в качестве параметра метода с аннотированными методами получения @Topic, как описано в Документация Micronaut Kafka , но это приведет к сохранению этой ссылки в качестве переменной экземпляра, получению доступа к ней через конечную точку HTTP и т. П. ... что звучит не очень убедительно: вы получаете ссылку ТОЛЬКО на KafkaConsumerпри получении следующего сообщения!Это может подходить для SUSPENDING / PAUSING, но не для RESUMING!

Кстати, вызов KafkaConsumer.resume (...) для ссылки, сохраненной как переменная экземпляра, дает

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2201)
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2185)
    at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1842)
    [...]

Я думаю, что то же самое справедливо при реализации интерфейса KafkaConsumerAware для хранения ссылки на только что созданный экземпляр KafkaConsumer.

Так есть ли идеи, как справиться с этим надлежащим образом?

Спасибо

Кристиан

...