Оригинальный ответ
Я не слишком знаком с методом serializedValueSize
, но согласно документации, это просто размер значения, хранящегося в этом сообщении. Это будет меньше, чем общий размер сообщения (даже с ключами null
), потому что сообщение также содержит метаданные (такие как отметка времени), которые не являются частью значения.
Что касается вашей проблемы: вместо того, чтобы напрямую управлять опросом, работая с размерами сообщений и ограничивая пропускную способность потребителя, почему бы просто не буферизовать входящие сообщения, пока их не станет достаточно или желаемый таймаут (вы упомянули fetch.max.wait.ms
, но Вы могли бы просто указать один вручную) прошло?
public static <K, V> List<ConsumerRecord<K, V>>
minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
List<ConsumerRecord<K, V>> acc = new ArrayList<>();
long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
long start = System.nanoTime();
do {
ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
for(ConsumerRecord<K, V> record : records)
acc.add(record);
} while(acc.size() < minRecords &&
System.nanoTime() - start < timeout.toNanos());
return acc;
}
Тайм-аут timeout.toMillis()/10
при вызове consumer.poll
является произвольным. Вы должны выбрать продолжительность, которая достаточно мала, чтобы не имело значения, если мы будем ждать это время дольше указанного времени ожидания (здесь: на 10% больше).
Редактировать: обратите внимание, что это может потенциально вернуть список, который больше max.poll.records
(максимальное значение max.poll.records + minRecords - 1
). Если вам также необходимо применить этот строгий верхний предел, либо используйте другой буфер, внешний по отношению к методу, для временного хранения лишних записей (что, вероятно, будет быстрее, но не позволит смешивать minPoll
и обычный метод poll
), либо просто отбросьте их и используйте метод consumer
seek
для возврата.
Ответ на обновленный вопрос
Таким образом, вопрос не столько в контроле количества сообщений, которые возвращаются методом poll
, сколько в том, как получить размер отдельной записи. К сожалению, я не думаю, что это возможно без большого количества проблем. Дело в том, что на этот вопрос нет реального (постоянного) ответа, и даже приблизительный ответ будет зависеть от версии Kafka или, скорее, от различных версий протокола Kafka.
Во-первых, я не совсем уверен, что именно контролирует max.partition.fetch.bytes
(как в: издержки протокола также являются его частью или нет?). Позвольте мне проиллюстрировать, что я имею в виду: когда потребитель отправляет запрос на выборку, ответ на выборку состоит из следующих полей:
- Время дроссельной заслонки (4 байта)
- Массив ответов темы (4 байта для длины массива + размер данных в массиве).
Ответ темы в свою очередь состоит из
- Название темы (2 байта для длины строки + размер строки)
- Массив ответов раздела (4 байта для длины массива + размер данных в массиве).
В ответе раздела есть
- ID раздела (4 байта)
- Код ошибки (2 байта)
- Верхний водяной знак (8 байт)
- Последнее стабильное смещение (8 байт)
- Смещение начала журнала (8 байт)
- Массив прерванных транзакций (4 байта для длины массива + данные в массиве)
- Набор записей.
Все это можно найти в файле FetchResponse.java
. Набор записей в свою очередь состоит из пакетов записей, которые содержат записи. Я не собираюсь перечислять все, что включает в себя пакет записей (вы можете увидеть это здесь ). Достаточно сказать, что объем служебной информации составляет 61 байт. Наконец, размер отдельной записи в пакете немного сложнее, поскольку он использует поля varint и varlong. Содержит
- Размер корпуса (1-5 байт)
- Атрибуты (1 байт)
- Метка времени дельта (1-10 байт)
- Смещение дельта (1-5 байт)
- массив байтов ключей (1-5 байтов + размер данных ключа)
- Массив байтов значений (1-5 байт + размер данных значений)
- Заголовки (1-5 байт + размер данных заголовков).
Исходный код для этого здесь . Как видите, вы не можете просто разделить 292 байта на два, чтобы получить размер записи, потому что некоторые служебные данные постоянны и не зависят от количества записей.
Что еще хуже, записи не имеют постоянного размера, даже если их ключи и значения (и заголовки) имеют значение, поскольку временная метка и смещение сохраняются как отличия от временной метки и смещения пакета с использованием типа данных переменной длины. Кроме того, это просто ситуация для самых последних версий протокола на момент написания этого. Для более старых версий ответ снова будет другим, и кто знает, что произойдет в будущих версиях.