Мне действительно нравится использовать все конфигурации, доступные в Spring Boot, для настройки потребителей Kafka:
ConsumerFactory() => KafkaListenerContainerFactory() => consume(V message)
Однако, похоже, я также теряю весь контроль над поведением потребителей, который поставляется с пакетом apache kafkaкак переключение между синхронизацией и асинхронной фиксацией, явный запуск потребителя и его полное отключение.С интерфейсом Spring kafka вам нужно реализовать только один метод, и вы просто начинаете получать сообщения:
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
public void consumeString(String message) {
System.out.println("Consumed message: " + message);
}
Мне кажется, что все это потребление происходит в отдельном потоке, который Spring Bootпредоставляет автоматически ... Может кто-нибудь сказать мне, как я все еще могу сохранить весь этот контроль при использовании Spring Boot для настройки KafkaConsumers (и по тому же принципу KafkaProducers)?Спасибо!