Понимание размера байта сообщения Кафки - PullRequest
2 голосов
/ 19 июня 2019

Как мне получить размер одной записи в Kafka?

Существует некоторая информация о том, зачем мне это нужно.

Похоже, это не serializedValueSize, представленный в ConsumerRecordили классы RecordMetadata.Я не совсем понимаю значение этого свойства, поскольку оно не соответствует размеру сообщения, полезного для потребителя.Для чего используется serializedValueSize, если не это?

Я пытаюсь заставить мое Java-приложение Kafka вести себя как "min.poll.records", если оно существовало в дополнение к "max.poll.records".Я должен сделать это, потому что это требуется :).Предполагая, что все сообщения по данной теме имеют одинаковый размер (что в данном случае верно), это должно быть возможно со стороны потребителя, если установить fetch.min.bytes равным количеству сообщений для пакетной обработки, умноженному на размер каждого байта.message.

Существует:

https://kafka.apache.org/documentation/#consumerapi

max.poll.records

Максимальное количество записей, возвращаемых за один вызовto poll ().

Этого не существует, но мне нужно поведение:

min.poll.records

Минимальное числозаписей, возвращенных в одном вызове poll ().Если до истечения времени, указанного в fetch.max.wait.ms, доступно недостаточно записей, то записи все равно возвращаются, и, следовательно, это не абсолютный минимум.

Вот что яДо сих пор нашел:

  • На стороне производителя у меня "batch.size" установлен в 1 байт.Это вынуждает производителя отправлять каждое сообщение отдельно.

  • Для размера потребителя у меня установлено значение "max.partition.fetch.bytes", равное 291 байту.Это заставляет потребителя получать только одно сообщение.Установка этого значения в 292 заставляет потребителя иногда возвращать 2 сообщения.Итак, я посчитал размер сообщения равным половине 292; Размер одного сообщения составляет 146 байт. .

  • Приведенные выше маркеры требуют изменения конфигурации Kafka и включают в себя просмотр / просмотр некоторых журналов сервера вручную.Было бы замечательно, если бы API Java Kafka предоставил это значение.

  • На стороне производителя Kafka предоставляет способ получить сериализованные размеры для записи в RecordMetadata.Метод serializedValueSize .Это значение составляет 76 байт, что значительно отличается от 146 байт, приведенных в приведенном выше тесте.

  • Для потребительского размера Kafka предоставляет ConsumerRecord API .Размер сериализованного значения из этой записи также равен 76. Смещение просто увеличивается на единицу каждый раз (не на размер байта записи).

  • Размер ключа равен -1байт (ключ нулевой).

System.out.println(myRecordMetadata.serializedValueSize());
// 76
# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 = 152
max.partition.fetch.bytes=152

# Actually works:
# 292 = ??? magic ???
max.partition.fetch.bytes=292

Я ожидал, что установка max.partition.fetch.bytes кратна числу байтов, заданных serializedValueSizeзаставит потребителя Kafka получить максимальное количество записей из опроса.Вместо этого значение max.partition.fetch.bytes должно быть намного выше, чтобы это произошло.

1 Ответ

2 голосов
/ 20 июня 2019

Оригинальный ответ

Я не слишком знаком с методом serializedValueSize, но согласно документации, это просто размер значения, хранящегося в этом сообщении. Это будет меньше, чем общий размер сообщения (даже с ключами null), потому что сообщение также содержит метаданные (такие как отметка времени), которые не являются частью значения.

Что касается вашей проблемы: вместо того, чтобы напрямую управлять опросом, работая с размерами сообщений и ограничивая пропускную способность потребителя, почему бы просто не буферизовать входящие сообщения, пока их не станет достаточно или желаемый таймаут (вы упомянули fetch.max.wait.ms, но Вы могли бы просто указать один вручную) прошло?

public static <K, V> List<ConsumerRecord<K, V>>
    minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
  List<ConsumerRecord<K, V>> acc = new ArrayList<>();
  long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
  long start = System.nanoTime();
  do {
    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
    for(ConsumerRecord<K, V> record : records)
      acc.add(record);
  } while(acc.size() < minRecords &&
          System.nanoTime() - start < timeout.toNanos());
  return acc;
}

Тайм-аут timeout.toMillis()/10 при вызове consumer.poll является произвольным. Вы должны выбрать продолжительность, которая достаточно мала, чтобы не имело значения, если мы будем ждать это время дольше указанного времени ожидания (здесь: на 10% больше).

Редактировать: обратите внимание, что это может потенциально вернуть список, который больше max.poll.records (максимальное значение max.poll.records + minRecords - 1). Если вам также необходимо применить этот строгий верхний предел, либо используйте другой буфер, внешний по отношению к методу, для временного хранения лишних записей (что, вероятно, будет быстрее, но не позволит смешивать minPoll и обычный метод poll), либо просто отбросьте их и используйте метод consumer seek для возврата.

Ответ на обновленный вопрос

Таким образом, вопрос не столько в контроле количества сообщений, которые возвращаются методом poll, сколько в том, как получить размер отдельной записи. К сожалению, я не думаю, что это возможно без большого количества проблем. Дело в том, что на этот вопрос нет реального (постоянного) ответа, и даже приблизительный ответ будет зависеть от версии Kafka или, скорее, от различных версий протокола Kafka.

Во-первых, я не совсем уверен, что именно контролирует max.partition.fetch.bytes (как в: издержки протокола также являются его частью или нет?). Позвольте мне проиллюстрировать, что я имею в виду: когда потребитель отправляет запрос на выборку, ответ на выборку состоит из следующих полей:

  1. Время дроссельной заслонки (4 байта)
  2. Массив ответов темы (4 байта для длины массива + размер данных в массиве).

Ответ темы в свою очередь состоит из

  1. Название темы (2 байта для длины строки + размер строки)
  2. Массив ответов раздела (4 байта для длины массива + размер данных в массиве).

В ответе раздела есть

  1. ID раздела (4 байта)
  2. Код ошибки (2 байта)
  3. Верхний водяной знак (8 байт)
  4. Последнее стабильное смещение (8 байт)
  5. Смещение начала журнала (8 байт)
  6. Массив прерванных транзакций (4 байта для длины массива + данные в массиве)
  7. Набор записей.

Все это можно найти в файле FetchResponse.java. Набор записей в свою очередь состоит из пакетов записей, которые содержат записи. Я не собираюсь перечислять все, что включает в себя пакет записей (вы можете увидеть это здесь ). Достаточно сказать, что объем служебной информации составляет 61 байт. Наконец, размер отдельной записи в пакете немного сложнее, поскольку он использует поля varint и varlong. Содержит

  1. Размер корпуса (1-5 байт)
  2. Атрибуты (1 байт)
  3. Метка времени дельта (1-10 байт)
  4. Смещение дельта (1-5 байт)
  5. массив байтов ключей (1-5 байтов + размер данных ключа)
  6. Массив байтов значений (1-5 байт + размер данных значений)
  7. Заголовки (1-5 байт + размер данных заголовков).

Исходный код для этого здесь . Как видите, вы не можете просто разделить 292 байта на два, чтобы получить размер записи, потому что некоторые служебные данные постоянны и не зависят от количества записей.

Что еще хуже, записи не имеют постоянного размера, даже если их ключи и значения (и заголовки) имеют значение, поскольку временная метка и смещение сохраняются как отличия от временной метки и смещения пакета с использованием типа данных переменной длины. Кроме того, это просто ситуация для самых последних версий протокола на момент написания этого. Для более старых версий ответ снова будет другим, и кто знает, что произойдет в будущих версиях.

...