fetch.min.bytes
Это свойство позволяет потребителю указывать минимальный объем данных, которые он хочет получить от посредника при получении записей. Если брокер получает запрос на получение записей от потребителя, но новые записи составляют меньше байтов, чем fetch.min.bytes, то брокер будет ждать до тех пор, пока не будет доступно больше сообщений, прежде чем отправлять записи обратно потребителю.
fetch.max.wait.ms
Он сообщит брокеру, что нужно подождать, пока у него будет достаточно данных для отправки, прежде чем отвечать потребителю.
Пример: Если вы установите fetch.max.wait.ms на 100 мс, а fetch.min.bytes на 1 МБ, Kafka получит запрос на выборку от потребителя и ответит данными либо при наличии 1МБ данных для возврата или через 100 мс, в зависимости от того, что произойдет раньше.
Выше двух параметров управления брокером при ответе на сообщение потребителю.
poll (timeout)
определяет, как долго будет блокироваться poll (), если данные недоступны в буфере потребителя.
*Опрос 1030 * - это запрос со стороны потребителя на получение записей, полученных от Брокера. Он вызывает fetchRecords () и, если записи уже доступны в брокере с удовлетворяющими выше параметрами fetch.min.bytes и fetch.max.wait.ms, он немедленно ответит, ожидая, пока заданное время ожидания вернется пустым, если в брокере нет доступных записей.
Ниже описаны методы pollForfetches в классе KafkaConsumer
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
final long startMs = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
client.poll(pollTimeout, startMs, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
, если fetch.min.bytes = 500 и fetch.max.wait.ms = 500, что означает, что брокер будет отвечать потребителю, когда у него есть 500байты данных для возврата или через 500 мсек, в зависимости от того, что произойдет раньше. Опрос на стороне потребителя будет каждые 200 мс вызывать fetchedRecords для получения любого сообщения, предоставленного брокером.