Весенняя Кафка с Динамическим @KafkaListener - PullRequest
1 голос
/ 02 апреля 2019

Я использую Spring Boot 2.x с spring-kafka (не spring-интеграция-kafka )

У меня есть несколько бинов, помеченных @KafkaListener ... каждая из которых использует одну тему ... так как у меня 12 тем, мне также нужно иметь 12 бинов KafkaConsumers ... и я хотел бы знать, могу ли я создавать эти бины программно / динамически ... возможноиспользуя KafkaListenerEndpointRegistry для динамического создания потребительских контейнеров.

Примечание: мне нужно использовать сообщения в пакетном режиме ... так что, возможно, я могу использовать BatchMessageListener ?

Текущий код:

@KafkaListener(
        id = COUNTRY,
        containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
        topics = {TOPIC},
        groupId = GROUP_ID,
        clientIdPrefix = CLIENT_ID,
        errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
    )
    @Override
    public void consume(final List<MessageDTO> messages,
        @Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
            (...)
    }

Каждый потребитель темы имеет свою реализацию в зависимости от темы.Ребята, не могли бы вы указать мне на блог / псевдокод / ​​ветку git / ответ, пожалуйста?

Ответы [ 2 ]

1 голос
/ 03 апреля 2019

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/support/GenericApplicationContext.html#registerBean-java.lang.Class-java.util.function.Supplier-org.springframework.beans.factory.config.BeanDefinitionCustomizer...-

Создайте свой объект и зарегистрируйте его как компонент, предоставив его через поставщика в вышеупомянутом методе.Spring будет запускать постпроцессоры бина, необходимые для настройки всего.

0 голосов
/ 05 апреля 2019

Если в ваших темах есть какой-то шаблон, вы можете попробовать и эту:

      kafka:
        bindings:
            input.consumer.destination-is-pattern: true
...