Предотвращение небольших файлов из Kafka connect с использованием приемника HDFS-коннектора в распределенном режиме - PullRequest
0 голосов
/ 03 июля 2018

У нас есть тема с сообщениями со скоростью 1msg в секунду с 3 разделами, и я использую разъем HDFS для записи данных в HDFS в формате AVRo (по умолчанию), он генерирует файлы с размером в KBS, поэтому я попытался изменить следующие свойства в свойствах HDFS.

"flush.size": "5000", "rotate.interval.ms": "7200000"

но на выходе все еще небольшие файлы, поэтому мне нужно прояснить следующие моменты, чтобы решить эту проблему:

  1. является обязательным свойством flush.size, в случае, если мы не упомянем свойство flus.size, как происходит сброс данных?

  2. Если мы упоминаем размер сброса как 5000 и интервал вращения как 2 часа, он стирает данные каждые 2 часа в течение первых 3 интервалов, но после этого он случайным образом сбрасывает данные. Пожалуйста, найдите время файла создание( 19: 14,21: 14,23: 15, 01: 15,06: 59,08: 59 , 12: 40,14: 40) - выделены несовпадающие интервалы. Это из-за по поводу упомянутых свойств - это подводит меня к третьему вопросу.

  3. Каково предпочтение для сброса, если мы упомянем все нижеуказанные свойства (flush.size, rotate.interval.ms, rotate.schedule.interval.ms)

  4. Увеличение скорости передачи сообщений и сокращение раздела фактически показывает увеличение размера сбрасываемых данных. Это единственный способ контролировать небольшие файлы, как мы можем обрабатывать свойства, если Скорость ввода событий различна и нестабильна?

Было бы очень полезно, если бы вы могли поделиться документацией, касающейся обработки небольших файлов в kafka, подключенном через разъем HDFS, спасибо.

1 Ответ

0 голосов
/ 16 ноября 2018

Если вы используете TimeBasedPartitioner, и сообщения не всегда будут иметь растущие временные метки, то в результате вы получите файлы с одной записывающей задачей, которые будут выгружать файлы, когда он увидит сообщение с меньшей временной меткой в ​​интервале rotate.interval.ms чтения любой данной записи.

Если вы хотите иметь согласованные двухчасовые окна разделов, то вы должны использовать rotate.interval.ms=-1, чтобы отключить его, а затем rotate.schedule.interval.ms до некоторого разумного числа, которое находится в пределах окна продолжительности раздела.

например. у вас 7200 сообщений каждые 2 часа, и неясно, насколько велико каждое сообщение, но скажем, 1 МБ. Затем вы будете хранить ~ 7 ГБ данных в буфере, и вам нужно будет настроить размеры кучи Connect для хранения такого количества данных.

Порядок присутствия

  1. плановое вращение, начиная с верхней части часа
  2. размер сброса или «основанное на сообщении» вращение времени, в зависимости от того, что произойдет раньше, или есть запись, которая рассматривается как «до» начала текущей партии

И я считаю, что размер флеша обязателен для разъемов хранения


В целом, такие системы, как Uber's Hudi или предыдущий инструмент Camus Sweeper Kafka-HDFS, более приспособлены для обработки небольших файлов. Задачи Connect Sink Tasks заботятся только об использовании Kafka и записи в нисходящие системы; сама структура не распознает, Hadoop предпочитает файлы большего размера.

...