Минимальное количество извлеченных байтов, свойство не применяется в Spring-Kafka - PullRequest
1 голос
/ 28 апреля 2019

Это моя потребительская конфигурация:

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "......MyEventDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "id");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "2000");

        return props;
    }

    @Bean
    public ConsumerFactory<String, List<MyEvent>> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, List<MyEvent>>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, List<MyEvent>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, List<MyEvent>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
}

Что важно, я устанавливаю конфигурацию fetch.min.bytes и fetch.max.wait.ms, и хотя я вижу эти свойства в журналепри запуске:

21:50:53.412 [main] INFO  o.a.k.c.c.ConsumerConfig () - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 2000
    fetch.min.bytes = 100000
    group.id = id
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    ...

это не влияет на размер полезной нагрузки, которая входит в каждое сообщение для потребителя.Когда я регистрирую размер byteArray, который поступает в десериализатор, он постоянен (около 20 КБ - объем одного сообщения, отправленного производителем) и не изменяется, несмотря на применение свойства fetch.min.bytes.Есть ли что-то, чего мне здесь не хватает, что я должен добавить, чтобы это работало?

1 Ответ

0 голосов
/ 28 апреля 2019

Я думаю, у вас есть основное недоразумение.Что вы подразумеваете под "... чтобы это работало"?

Эти свойства не влияют на отдельные сообщения.

Если ваши сообщения имеют размер 20000 байт, эти настройки означают poll() будет ждать 5 сообщений или 2 секунды, в зависимости от того, что наступит раньше.

Это не механизм "отфильтровывания" сообщений размером менее 10 Кб, если вы об этом думаете.

СSpring для Apache Kafka, вы можете использовать FilteringMessageListenerAdapter для фильтрации сообщений, которые вам не интересны.

Если вы используете @KafkaListener, добавьте RecordFilterStrategy к фабрике контейнеров, и адаптер будетавтоматически применяется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...