Потребитель Kafka вернулся из опроса раньше, чем задано pollTimeout? - PullRequest
0 голосов
/ 26 мая 2020

Я создал потребителя Kafka (с использованием интерфейса KafkaListener) с помощью стартера Spring Kafka. Я установил pollTimeout в containerFactory для моего потребителя. Я выставил 5 секунд. Когда я отправляю пакет из 100 сообщений сразу, я замечаю, что мой потребитель получит некоторые сообщения из пакета, но не все. Я ожидал, что потребитель будет ждать 5 секунд, ожидая дополнительных записей?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setSyncCommits(true);
    return factory;
}

значения конфигурации, которые я устанавливаю:

kafka_consumer_concurrency: "1"
kafka_consumer_max_poll_interval_ms: "290000"
kafka_consumer_enable_auto_commit: "false"
kafka_consumer_request_timeout_ms: "300000"
kafka_consumer_fetch_max_wait_ms: "1000"
kafka_consumer_fetch_min_bytes: "50000000"
kafka_consumer_fetch_max_bytes: "52428800"
kafka_consumer_partition_fetch_max_bytes: "52428800"
kafka_consumer_poll_timeout_ms: "280000"

Оператор журнала запросов извлечения Kafka:

[DEBUG] 20:50:08.028 [-0-C-1] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=consumer_listener] Using older server API v6 to send FETCH 

{replica_id=-1,max_wait_time=1000,min_bytes=50000000,max_bytes=52428800,isolation_level=0,topics=[{topic=test-topic,partitions=[{partition=1,fetch_offset=988786,log_start_offset=-1,partition_max_bytes=52428800},

{partition=0,fetch_offset=1619712,log_start_offset=-1,partition_max_bytes=52428800},

{partition=2,fetch_offset=989663,log_start_offset=-1,partition_max_bytes=52428800}]}]} with correlation id 832 to node 124

Точно такая же проблема сообщается здесь

1 Ответ

1 голос
/ 26 мая 2020

Таймаут опроса - это просто время, в течение которого мы будем ждать, пока станет доступна хотя бы одна запись.

Нет poll.min.records, только fetch.min.bytes и fetch.max.wait.ms.

fetch.min.bytes

Минимальный объем данных, который сервер должен вернуть для запроса выборки. Если данных недостаточно, запрос будет ждать, пока накопится такой объем данных, прежде чем ответить на запрос. Значение по умолчанию, равное 1 байту, означает, что на запросы выборки ответят, как только станет доступен единственный байт данных или истечет время ожидания запроса на получение данных. Установка значения больше 1 приведет к тому, что сервер будет ждать накопления больших объемов данных, что может немного улучшить пропускную способность сервера за счет некоторой дополнительной задержки.

fetch.max.wait.ms

Максимальный период времени, в течение которого сервер будет блокировать перед ответом на запрос выборки, если данных недостаточно для немедленного удовлетворения требований, заданных fetch.min.bytes.

...