Конфигурирование потребителя Kafka в Spring Boot = контроль потерь над поведением потребителя? - PullRequest
0 голосов
/ 11 декабря 2018

Мне действительно нравится использовать все конфигурации, доступные в Spring Boot, для настройки потребителей Kafka:

ConsumerFactory() => KafkaListenerContainerFactory() => consume(V message)

Однако, похоже, я также теряю весь контроль над поведением потребителей, который поставляется с пакетом apache kafkaкак переключение между синхронизацией и асинхронной фиксацией, явный запуск потребителя и его полное отключение.С интерфейсом Spring kafka вам нужно реализовать только один метод, и вы просто начинаете получать сообщения:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeString(String message) {
    System.out.println("Consumed message: " + message);
}

Мне кажется, что все это потребление происходит в отдельном потоке, который Spring Bootпредоставляет автоматически ... Может кто-нибудь сказать мне, как я все еще могу сохранить весь этот контроль при использовании Spring Boot для настройки KafkaConsumers (и по тому же принципу KafkaProducers)?Спасибо!

1 Ответ

0 голосов
/ 12 декабря 2018

Мне кажется, что все это потребление происходит в отдельном потоке, который Spring Boot предоставляет автоматически

Это верно, Spring упаковывает потребителя kafka в контейнер слушателя.

Самое большее, вы можете настроить поведение через свойства контейнера .

Теоретически вы можете сохранить самое последнее значение, возвращаемое фабрикой потребителей kafka , но вам нужнопомнить, что для одного контейнера можно создать потребителя несколько раз.В любом случае, сохраняя ссылку, вы можете получить доступ к базовому потребителю кафки без рефлексии.Но это взлом, и потенциально вы можете повлиять на поведение контейнера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...