fetch.max.wait.ms против параметра для метода poll () - PullRequest
1 голос
/ 04 ноября 2019

Прежде чем задать свой вопрос, я хотел бы отметить, что подобный вопрос был задан здесь , но на него не было ответа, поэтому я задаю еще раз. Пожалуйста, не отмечайте это как дубликат, так как ранее упомянутый вопрос не имеет ответов.

У меня есть сомнения относительно fetch.max.wait.ms и consumer.poll(<value>). Это то, что я нашел в своем исследовании вышеупомянутых конфигов

Метод poll () принимает параметр timeout. Это указывает, сколько времени потребуется для возврата опроса, с данными или без данных

Если вы установите fetch.max.wait.ms на 100 мс и fetch.min.bytes на 1 МБ, Кафка получит запрос на выборкуот потребителя и ответит данными, когда у него будет 1 МБ данных для возврата или через 100 мс, в зависимости от того, что произойдет раньше.

Поэтому мой вопрос: что произойдет, когда fetch.max.wait.ms=500, consumer.poll(200) и fetch.min.bytes= 500 но у брокера недостаточно данных для возврата, как установлено fetch.min.bytes?

Ответы [ 2 ]

0 голосов
/ 05 ноября 2019

fetch.min.bytes

Это свойство позволяет потребителю указывать минимальный объем данных, которые он хочет получить от посредника при получении записей. Если брокер получает запрос на получение записей от потребителя, но новые записи составляют меньше байтов, чем fetch.min.bytes, то брокер будет ждать до тех пор, пока не будет доступно больше сообщений, прежде чем отправлять записи обратно потребителю.

fetch.max.wait.ms

Он сообщит брокеру, что нужно подождать, пока у него будет достаточно данных для отправки, прежде чем отвечать потребителю.

Пример: Если вы установите fetch.max.wait.ms на 100 мс, а fetch.min.bytes на 1 МБ, Kafka получит запрос на выборку от потребителя и ответит данными либо при наличии 1МБ данных для возврата или через 100 мс, в зависимости от того, что произойдет раньше.

Выше двух параметров управления брокером при ответе на сообщение потребителю.

poll (timeout)

определяет, как долго будет блокироваться poll (), если данные недоступны в буфере потребителя.

*Опрос 1030 * - это запрос со стороны потребителя на получение записей, полученных от Брокера. Он вызывает fetchRecords () и, если записи уже доступны в брокере с удовлетворяющими выше параметрами fetch.min.bytes и fetch.max.wait.ms, он немедленно ответит, ожидая, пока заданное время ожидания вернется пустым, если в брокере нет доступных записей.

Ниже описаны методы pollForfetches в классе KafkaConsumer

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

        // if data is available already, return it immediately
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        client.poll(pollTimeout, startMs, () -> {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }

        return fetcher.fetchedRecords();
    }

, если fetch.min.bytes = 500 и fetch.max.wait.ms = 500, что означает, что брокер будет отвечать потребителю, когда у него есть 500байты данных для возврата или через 500 мсек, в зависимости от того, что произойдет раньше. Опрос на стороне потребителя будет каждые 200 мс вызывать fetchedRecords для получения любого сообщения, предоставленного брокером.

0 голосов
/ 04 ноября 2019

Из документов Сервер будет заблокирован, если недостаточно данных, предоставленных fetch.min.bytes. Таким образом, в вашем случае сервер будет ждать до 500 мс, если недостаточно данных

Приходит poll публичный опрос ConsumerRecords (длительное время ожидания) Согласно KafkaConsumer документам, поскольку существуетне достаточно опроса потребителей данных будет пустым для каждых 200 мс, пока у брокера не будет достаточно данных

timeout - Время в миллисекундах, потраченное на ожидание в опросе, если данные недоступны в буфере. Если 0, немедленно возвращается с любыми доступными в данный момент записями в буфере, иначе возвращает пустое. Не должно быть отрицательным.

fetch.max.wait.ms

Максимальное время, которое сервер будет блокировать до ответа на запрос выборки , если не хватает данных для немедленного удовлетворения требованиям, заданным fetch.min.bytes.

fetch.min.bytes

Минимальный объем данных, которые сервер должен вернуть для запроса выборки. Если данных недостаточно, запрос будет ждать накопления такого количества данных, прежде чем ответить на запрос. Значение по умолчанию, равное 1 байту, означает, что запросы на выборку отвечают, как только один байт данных становится доступным или время запроса на выборку истекает в ожидании поступления данных. Если задать для этого параметра значение, большее 1, сервер будет ожидать накопления больших объемов данных, что может немного повысить пропускную способность сервера за счет некоторой дополнительной задержки.

...