Интеграция Spring Boot и Reafor-Kafka's KafkaReceiver - PullRequest
3 голосов
/ 19 марта 2019

Я пытаюсь разработать приложение Spring Boot, используя библиотеку reactor-kafka, чтобы реагировать на некоторые сообщения, прочитанные в теме Кафки.

У меня есть класс конфигурации, который создает KafkaReceiver.

@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    } 
}

Ну ... а сейчас? Используя не очень реактивную библиотеку spring-kafka, я могу аннотировать метод с помощью @KafkaListener, и Spring Boot создаст для меня поток, слушающий тему Кафки.

Где мне разместить KafkaReceiver вместо этого? Во всех примерах я нашел использование метода main, но это не способ загрузки .

Я использую Spring Boot 2.1.3 и Reactor-Kafka 1.1.0

Заранее спасибо.

1 Ответ

3 голосов
/ 19 марта 2019

Так как у вас есть этот KafkaReceiver боб, теперь вы можете делать это так:

@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
        return args -> {
                kafkaReceiver.receive()
                          ...
                          .sunbscribe();
        };
}

Этот боб ApplicationRunner будет запущен, когда ApplicationContext будет готов. См. Его JavaDocs для получения дополнительной информации.

...