Я создал bean-компонент:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers().get(0));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// maximum records per poll
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000");
props.put("schema.registry.url", "http://localhost:8081");
return props;
}
И добавил containerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// enable batch listening
factory.setBatchListener(true);
eturn factory;
}
Добавить слушателя:
@KafkaListener(id = "batch-listener", topics = "TestTopic")
public void receive(List<ConsumerRecord<String, GenericRecord>> record) {
log.info("start of batch receive");
for (int i = 0; i < record.size(); i++) {
log.info("received message='{}'", record.get(i));
}
log.info("end of batch receive");
}
Я отправил в Kafka 1_000_000 записей.
В слушателе я ожидаю получить 100_000 записей за один раз, но я получаю около 20_000 записей: ![enter image description here](https://i.stack.imgur.com/3gk0P.png)
Но почему это происходит?Почему я не получаю 100_000 записей одновременно?