Я создал потребителя Kafka (с использованием интерфейса KafkaListener) с помощью стартера Spring Kafka. Я установил pollTimeout в containerFactory для моего потребителя. Я выставил 5 секунд. Когда я отправляю пакет из 100 сообщений сразу, я замечаю, что мой потребитель получит некоторые сообщения из пакета, но не все. Я ожидал, что потребитель будет ждать 5 секунд, ожидая дополнительных записей?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
значения конфигурации, которые я устанавливаю:
kafka_consumer_concurrency: "1"
kafka_consumer_max_poll_interval_ms: "290000"
kafka_consumer_enable_auto_commit: "false"
kafka_consumer_request_timeout_ms: "300000"
kafka_consumer_fetch_max_wait_ms: "1000"
kafka_consumer_fetch_min_bytes: "50000000"
kafka_consumer_fetch_max_bytes: "52428800"
kafka_consumer_partition_fetch_max_bytes: "52428800"
kafka_consumer_poll_timeout_ms: "280000"
Оператор журнала запросов извлечения Kafka:
[DEBUG] 20:50:08.028 [-0-C-1] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=consumer_listener] Using older server API v6 to send FETCH
{replica_id=-1,max_wait_time=1000,min_bytes=50000000,max_bytes=52428800,isolation_level=0,topics=[{topic=test-topic,partitions=[{partition=1,fetch_offset=988786,log_start_offset=-1,partition_max_bytes=52428800},
{partition=0,fetch_offset=1619712,log_start_offset=-1,partition_max_bytes=52428800},
{partition=2,fetch_offset=989663,log_start_offset=-1,partition_max_bytes=52428800}]}]} with correlation id 832 to node 124
Точно такая же проблема сообщается здесь