KafkaMessageListenerContainer против ConcurrentMessageListenerContainer - PullRequest
1 голос
/ 08 октября 2019

Если я использую свойство listener.concurrency, будет ли это использовать KafkaMessageListenerContainer или ConcurrentMessageListenerContainer? Заметьте, что я не определил никакие bean-компоненты явно, и я оставляю необходимое создание bean-компонента для весенней загрузки.

Я хотел знать, правильно ли мое понимание. Я хотел бы, чтобы параллелизм использовался в сообщении из темы, в которой несколько разделов говорят на 3. Так что все в порядке, если я сконфигурирую его в файле application.yml следующим образом. Я попробовал приведенную ниже конфигурацию и увидел, что были созданы 3 потребителя.

spring:
   kafka:
     consumer:
        group-id: group-id
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
     listener:
        concurrency: 3 

Итак, мой вопрос: нужно ли создавать bean-компонент для настройки ConcurrentMessageListenerContainer, или для конфигурации вышеупомянутого слушателя внутренне будет использоваться ConcurrentMessageListenerContainer вместо KafkaMessageListenerContainer, когда он видит listener.concurrency=3?

1 Ответ

0 голосов
/ 09 октября 2019
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

Concurrency является свойством ConcurrentMessageListenerContainer. Если мы определяем это свойство в application.yml, это означает, что мы хотим использовать ConcurrentMessageListenerContainer.

enter image description here

В Kafka только одному потребителю разрешено читать данные из одного раздела за раз. Использование KafkaMessageListenerContainer обеспечит только приемник прослушивателя однопоточных сообщений. KafkaMessageListenerContainer

Использование ConcurrentMessageListenerContainer предоставит число KafkaMessageListenerContainer, определенное в свойстве Concurrency в application.yml.

Мы можем получить то же поведение, используяследующие Listener using Java code.

@Configuration
@EnableKafka
public class KafkaListenerConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

Я бы предложил вам перейти по ссылке ниже, чтобы иметь практический пример того же.

https://howtoprogram.xyz/2016/09/25/spring-kafka-multi-threaded-message-consumption/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...