Получить сжатый Kafka размер сообщения - PullRequest
0 голосов
/ 09 мая 2018

Хотелось бы узнать сжатый размер сообщения в кафке.

Я использую kafka 1.1.0 и java kafka-connect 1.1.0 для отправки сообщений от моего производителя в тему.

Если сообщение слишком велико для моего производителя, я получаю

При сериализации сообщение имеет размер xxx байт, который превышает максимальный размер запроса, настроенный вами в конфигурации max.request.size.

Установка для max.request.size подходящего значения приводит к сообщению об ошибке от брокера, так как message.max.bytes также должен быть соответствующим образом скорректирован в конфигурации брокера. К сожалению, сообщение об ошибке не включает размер сообщения, полученного брокером. Я настроил message.max.bytes. Пока все хорошо.

Если я активирую сжатие на стороне производителя, max.request.size по-прежнему должен иметь тот же размер, что и без сжатия, поскольку код, к сожалению, сравнивает размер несжатого сообщения перед его сжатием (см. https://issues.apache.org/jira/browse/KAFKA-4169)

Но со сжатием я смогу уменьшить message.max.bytes в брокере. Проблема в том, что ни в коем случае я не могу определить размер этого сжатого сообщения. Есть ли способ выяснить это либо в коде производителя перед отправкой сообщения, либо позже в файлах журнала?

В моем случае со сжатием достаточно значения по умолчанию 1 МБ для message.max.bytes, поэтому мне не нужно изменять конфигурацию по умолчанию. Но я хотел бы знать, находится ли мое сжатое сообщение ниже 1 МБ или, может быть, просто 0,99 МБ. В этом случае я мог бы увеличить message.max.bytes на производстве, чтобы избежать проблем.

Спасибо за вашу поддержку заранее.

1 Ответ

0 голосов
/ 18 января 2019

Что вы можете сделать, это использовать библиотеку сжатия, самостоятельно сжать сообщение, проверить его размер перед отправкой.Например, если вы используете сжатие lz4, вы можете использовать lz4-java lib, а затем что-то вроде:

private static LZ4Compressor COMPRESS = LZ4Factory.fastestInstance().highCompressor();

String meMessageString      = "My Message that I am sending to kafka";
byte[] uncompressedBytes    = jsonRequest.getBytes();
long lz4compressedLength    = COMPRESSOR.compress(uncompressedBytes).length;
...