Проблема с размером пакета в MapR Streams Kafka API - PullRequest
0 голосов
/ 22 января 2020

Здравствуйте, я использую Kafka MapRStream для получения событий из потоков Mapr. Topi c.

Я пытаюсь увеличить размер пакета моего потребителя, но я не получаю больше чем 30 сообщений в один пакет !

Одно событие имеет размер около 5000 байт. Если событие меньше, я получаю больше за один пакет.

Вот моя конфигурация Потребитель:

public static void main( String[] args ) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchSize");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 26214400);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024 * 1024);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);


        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        long totalCount = 0;
        long start = System.currentTimeMillis();
        long countTimesNoMessages = 0;

        while (countTimesNoMessages < 10) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            totalCount += records.count();
            System.out.println(records.count());
            if (records.count() == 0) {
                countTimesNoMessages++;
            }
        }

        long end = System.currentTimeMillis();
        System.out.println((end - start) + " for " + totalCount + " messages");
    } 

1 Ответ

0 голосов
/ 22 января 2020

Это возможные точки конфигурации.

https://mapr.com/docs/61/MapR_Streams/configuration-parameters.html

обратите внимание, что fetch.max.bytes - это максимальное значение, а sum(max.partition.fetch.bytes) по всем разделам не может go over fetch.max.bytes.

Нормально настроить max.partition.fetch.bytes, чтобы из каждого раздела опрашивалось более 64 КБ (по умолчанию), а также настраивается fetch.max.bytes, чтобы он мог работать max.partition.fetch.bytes

Не следует устанавливать слишком большой размер пакета. Как только частота запросов на опрос потока упадет ниже нескольких сотен в секунду или около того, вы вряд ли получите дополнительные улучшения в производительности и гораздо чаще будете иметь проблемы с горячими точками или большим количеством повторной работы в случае сбоя потоков .

...