Spring Kafka 2.2.2 Consumer возвращает только одну запись вместо max-poll-records - PullRequest
0 голосов
/ 23 июня 2019

Мы используем Spring Kafka 2.2.2 Release для извлечения записей из Kafka, используя @KafkaListener и с ConcurrentKafkaListenerContainerFactory. Мы настроили max-poll-records равным 5, однако он всегда дает потребителю только 1 запись в списке вместо 5 записей.

Хотя с такой же конфигурацией, он работает в Spring Kafka 2.1.4. Выпуск.

Вот наша конфигурация application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      max-poll-records: 5
      bootstrap-servers: localhost:9092
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.gap.cascade.li.data.xx.xx.CustomDeserialiser

Вот наш ConcurrentKafkaListenerContainerFactory:

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);

    return factory;
  }

Нам не хватает какой-либо конфигурации, которую необходимо выполнить для выпуска Spring Kafka 2.2.2?

1 Ответ

2 голосов
/ 23 июня 2019

Если у вас есть слушатель

@KafkaListener(...)
public void listen(List<...> data) {
    ...
}

Настройка factory.setBatchListener(true); должна работать для вас (если готово более одной записи).

Вы также можете использовать свойство загрузки

spring:
  kafka:
    listener:
      type: batch

сделать то же самое; избегая необходимости объявлять собственную фабрику.

Если вы включите ведение журнала DEBUG, контейнер будет записывать, сколько записей возвращено опросом. Вы также можете установить fetch.min.bytes и fetch.max.wait.ms, чтобы влиять на количество возвращаемых записей, если только одна сразу готова ...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        fetch.min.bytes: 10000
        fetch.max.wait.ms: 2000
    listener:
      type: batch

Кстати, текущая версия 2.2.x - 2.2.7 (boot 2.1.6).

...