Это моя потребительская конфигурация:
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "......MyEventDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "2000");
return props;
}
@Bean
public ConsumerFactory<String, List<MyEvent>> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, List<MyEvent>>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, List<MyEvent>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ProducerFactory<String, List<MyEvent>> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
Что важно, я устанавливаю конфигурацию fetch.min.bytes и fetch.max.wait.ms, и хотя я вижу эти свойства в журналепри запуске:
21:50:53.412 [main] INFO o.a.k.c.c.ConsumerConfig () - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 2000
fetch.min.bytes = 100000
group.id = id
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
...
это не влияет на размер полезной нагрузки, которая входит в каждое сообщение для потребителя.Когда я регистрирую размер byteArray, который поступает в десериализатор, он постоянен (около 20 КБ - объем одного сообщения, отправленного производителем) и не изменяется, несмотря на применение свойства fetch.min.bytes.Есть ли что-то, чего мне здесь не хватает, что я должен добавить, чтобы это работало?