Неожиданная зависимость cycli c при объединении пружинной-кафки и реакторной-кафки - PullRequest
0 голосов
/ 07 января 2020

Следующий код выдает неожиданную циклическую c зависимость между receiveOptions и шаблоном:

Удивительно, но это работает, если kafkaProps удаляются из контекста Spring.

Похоже, что некоторая автоконфигурация добавляет ненужная зависимость от шаблона до получателя.

. Пожалуйста, предложите правильный способ настройки ReactiveKafkaConsumerTemplate.

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Map<String, Object> kafkaProps() {
        return Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
            ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
        );
    }

    @Bean
    public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
        return ReceiverOptions.<String, String>create(kafkaProps)
            .withKeyDeserializer(new StringDeserializer())
            .withValueDeserializer(new StringDeserializer())
            .subscription(List.of("test-topic"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @Bean
    public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
        return args -> kafkaReceiver.receive()
            .log()
            .subscribe();
    }
}

Полный проект можно найти по адресу https://github.com/piddubnyi/spring-reactor-kafka-ciclyc

Полная трассировка стека:

2020-01-07 16:29:51.031 DEBUG 24915 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : Application failed to start due to an exception

org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'receiverOptions': Requested bean is currently in creation: Is there an unresolvable circular reference?
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.beforeSingletonCreation(DefaultSingletonBeanRegistry.java:339) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:215) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1287) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.addCandidateEntry(DefaultListableBeanFactory.java:1503) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1467) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveMultipleBeans(DefaultListableBeanFactory.java:1386) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1245) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:19) ~[main/:na]

1 Ответ

2 голосов
/ 07 января 2020

Полагаю, Spring не знает, какой Map<String, Object> ввести. Попробуйте добавить @Qualifier("kafkaProps") к параметру kafkaProps:

@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Map<String, Object> kafkaProps) {
    // ...
}

ОБНОВЛЕНИЕ

Странная вещь, после добавления @Qualifier Пружины kafkaProps в другое карта ...

Пока что, возможно, просто используйте Свойства вместо Карты:

@Bean
public Properties kafkaProps() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName());
    return props;
}

@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Properties kafkaProps) {
    // ...
}

ОБНОВЛЕНИЕ 2

Относительно странного Map<String, Object> инъекция: на самом деле это не так странно. Map<String, SomeType>, когда вводится по типу, обрабатывается особым образом: он собирает все бины типа MyType (ключи - это имена бинов). Так что в вашем случае Map<String, Object> - это карта всех компонентов в контексте, которая, очевидно, заканчивается круговой ссылкой. То же самое произошло бы для List<Object> или Set<Object>. Добавление аннотации @Qualifier не меняет это поведение, но фильтрует компоненты. Таким образом, карта kafkaProps является единственным компонентом, проходящим через фильтр, и кажется, что она «обернута» в другую карту.

Даже экземпляры типизированной карты могут быть автоматически подключены, пока ожидается тип ключа Строка. Значения карты содержат все bean-компоненты ожидаемого типа, а ключи содержат соответствующие имена bean-компонентов (...)

Переключение на внедрение по имени должно решить проблему. Обычно это можно сделать, добавив @Resource(name = "kafkaProps") вместо (неявной) аннотации @Autowired, но, к сожалению, аннотацию @Resource нельзя добавить к параметру метода.

Однако существует другое решение. Поскольку оба bean-компонента объявлены в одном и том же @Configuration классе, вы можете напрямую вызвать kafkaProps(). Spring будет возвращать один и тот же bean-компонент, а не новый экземпляр при каждом вызове (осторожно, он не поддерживается для методов stati c)

@Bean
public ReceiverOptions<String, String> receiverOptions() {
    return ReceiverOptions.<String, String>create(kafkaProps())
        ...
}

Ссылки:

...