Не в состоянии опросить / получить все записи из темы Кафка - PullRequest
0 голосов
/ 13 марта 2020

Я пытаюсь опросить данные из указанных c topi c, например, kafka получает 100 записей / с, но большую часть времени не получает все записи. Я использую тайм-аут как 5000 мс, и я вызываю этот метод каждые 100ms Примечание: я подписываюсь на указанные c topi c тоже

@ Scheduled (fixedDelayString = "100")

    public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

Как мне получить все данные с kafka?

1 Ответ

0 голосов
/ 13 марта 2020

Максимальное количество записей, возвращаемых функцией poll (), указывается с помощью max.poll.records параметра конфигурации потребителя. (по умолчанию 500) Кроме того, есть другие параметры конфигурации потребителя, которые ограничивают максимальный объем данных, возвращаемых с сервера. (fetch.max.bytes и max.partition.fetch.bytes)

С другой стороны, на стороне брокера есть еще одно ограничение размера, которое называется message.max.bytes.

Поэтому вы должны правильно установить эти параметры, чтобы получить больше сообщений.

Из документов Кафки ( ссылка ):

max.poll.records: Максимальное количество записей, возвращаемых в один вызов опроса (). (по умолчанию: 500)

fetch.max.bytes: Максимальный объем данных, которые сервер должен вернуть для запроса выборки. Записи извлекаются пакетами потребителем, и, если первая партия записей в первом непустом разделе выборки больше этого значения, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель может добиться прогресса. Таким образом, это не абсолютный максимум. Максимальный размер пакета записи, принимаемый брокером, определяется с помощью message.max.bytes (конфигурация брокера) или max.message.bytes (topi c config). Обратите внимание, что потребитель выполняет несколько выборок параллельно. (по умолчанию: 52428800)

message.max.bytes: Наибольший размер пакета записи, разрешенный Kafka. Если это значение увеличивается, и есть потребители старше 0.10.2, размер выборки потребителей также должен быть увеличен, чтобы они могли получать партии записей такого размера. В последней версии формата сообщений для эффективности записи всегда группируются в пакеты. В предыдущих версиях формата сообщений несжатые записи не группируются в пакеты, и в этом случае это ограничение применяется только к одной записи. Это может быть установлено для topi c с помощью конфигурации topi c level max.message.bytes. (по умолчанию: 1000012)

max.partition.fetch.bytes: Максимальный объем данных на раздел, который вернет сервер. Записи извлекаются партиями потребителем. Если первый пакет записей в первом непустом разделе выборки больше этого предела, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может добиться прогресса. Максимальный размер пакета записи, принимаемый брокером, определяется через message.max.bytes (конфигурация брокера) или max.message.bytes (topi c config). Смотрите fetch.max.bytes для ограничения размера запроса потребителя. (по умолчанию: 1048576)

...