Чтение данных из Kafka с помощью Batch не работает должным образом с помощью SpringBoot - PullRequest
0 голосов
/ 26 сентября 2019

Я создал bean-компонент:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers().get(0));
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // maximum records per poll
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000");
    props.put("schema.registry.url", "http://localhost:8081");
    return props;
}

И добавил containerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // enable batch listening
    factory.setBatchListener(true);

     eturn factory;
}

Добавить слушателя:

@KafkaListener(id = "batch-listener", topics = "TestTopic")
public void receive(List<ConsumerRecord<String, GenericRecord>> record) {
     log.info("start of batch receive");
     for (int i = 0; i < record.size(); i++) {
         log.info("received message='{}'", record.get(i));
     }
     log.info("end of batch receive");
}

Я отправил в Kafka 1_000_000 записей.

В слушателе я ожидаю получить 100_000 записей за один раз, но я получаю около 20_000 записей: enter image description here

Но почему это происходит?Почему я не получаю 100_000 записей одновременно?

1 Ответ

1 голос
/ 26 сентября 2019

max.poll.records - это просто максимум.

Существуют и другие свойства, которые влияют на количество получаемых вами записей.

  • fetch.min.bytes - минимальный объем данных, который сервер должен вернутьдля запроса на получение.Если данных недостаточно, запрос будет ждать накопления такого количества данных, прежде чем ответить на запрос.Значение по умолчанию, равное 1 байту, означает, что на запросы выборки отвечают, как только один байт данных становится доступным или время запроса выборки истекает в ожидании поступления данных.Если для этого параметра задано значение, большее 1, сервер будет ожидать накопления больших объемов данных, что может немного повысить пропускную способность сервера за счет некоторой дополнительной задержки.
  • fetch.max.wait.ms - максимальное количество временисервер будет блокироваться перед ответом на запрос на выборку, если данных недостаточно для немедленного удовлетворения требованиям, заданным fetch.min.bytes.

См. документацию .

Невозможно точно контролировать минимальное количество записей (если они не имеют одинаковую длину).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...