как настроить потребительский параллелизм кафки с помощью весенней загрузки - PullRequest
0 голосов
/ 10 мая 2018

Я пишу приложение Kafka Consumer на основе Java. Я использую kafka-клиенты, Spring Kafka и Spring boot для своего приложения. Хотя загрузка Spring позволяет мне легко писать Kafka Consumers (без необходимости писать ConcurrentKafkaListenerContainerFactory, ConsumerFactory и т. Д.), Я хочу иметь возможность определять / настраивать некоторые свойства для этих потребителей. Однако я не смог найти простой способ сделать это с помощью Spring boot. Например: некоторые свойства, которые я бы заинтересовал в настройке, -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

Я взглянул на предопределенные свойства Spring Boot здесь .

Кроме того, основываясь на предыдущем вопросе здесь , я хочу настроить параллелизм для потребителей, но не могу найти конфигурацию, основанную на application.properties, для этого с помощью Spring Boot.

Очевидным способом является определение классов ConcurrentKafkaListenerContainerFactory, ConsumerFactory снова в моем Spring Context и работа оттуда. Я хотел понять, есть ли более чистый способ сделать это, тем более что я использую Spring Boot.

Версии-

  • Кафка-клиенты - 0.10.0.0-SASL
  • spring-kafka - 1.1.0. РЕЛИЗ
  • пружинный ботинок - 1.5.10. РЕЛИЗ

Ответы [ 2 ]

0 голосов
/ 10 мая 2018

Я гуглил и нашел https://github.com/spring-projects/spring-kafka/issues/604. Проблема была закрыта со ссылкой https://docs.spring.io/spring-boot/docs/2.0.0.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-props. Но это для весенней загрузки версии 2.0.

0 голосов
/ 10 мая 2018

По указанному вами URL прокрутите вниз до

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

spring-kafka - 1.1.0. РЕЛИЗ

Рекомендую обновить хотя бы до 1.3.5; благодаря KIP-62 она имеет гораздо более простую модель с резьбой.

EDIT

В Boot 2.0 вы можете установить произвольные свойства производителя, потребителя, администратора и общие свойства, как описано в документации по загрузке .

spring.kafka.consumer.properties.heartbeat.interval.ms

В Boot 1.5 есть только spring.kafka.properties, как описано здесь .

Это устанавливает свойства как для производителей, так и для потребителей, но вы можете увидеть некоторый шум в журнале о неиспользуемых / неподдерживаемых свойствах для производителя.

В качестве альтернативы, вы можете просто переопределить фабрику потребителей Boot и при необходимости добавить свойства ...

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...