Почему метод repartition () увеличивает размер файла на диске? - PullRequest
0 голосов
/ 16 января 2019

Озеро данных, с которым я работаю (df), имеет 2 ТБ данных и 20 000 файлов. Я хотел бы сжать набор данных в 2000 файлов по 1 ГБ.

Если вы запускаете df.coalesce(2000) и записываете на диск, озеро данных содержит 1,9 ТБ данных.

Если вы запускаете df.repartition(2000) и записываете на диск, озеро данных содержит 2,6 ТБ данных.

Каждый файл в озере данных repartition() на 0,3 ГБ больше, чем ожидалось (это все файлы 1,3 ГБ вместо файлов 1 ГБ).

Почему метод repartition() увеличивает размер общего озера данных?

Существует связанный вопрос , в котором объясняется, почему размер озера данных увеличивается после выполнения агрегации. Ответ гласит:

В общем, форматы столбчатых хранилищ, такие как Parquet, очень чувствительны, когда речь идет о распределении данных (организации данных) и количестве элементов в отдельных столбцах. Чем организованнее данные и чем меньше количество элементов, тем эффективнее хранилище.

Является ли coalesce() алгоритм, обеспечивающий более организованные данные ... Я так не думаю ...

Не думаю, что другой вопрос отвечает на мой вопрос.

1 Ответ

0 голосов
/ 16 января 2019

Отказ

Этот ответ содержит в основном предположения. Детальное объяснение этого явления может потребовать углубленного анализа входных и выходных данных (или, по крайней мере, их соответствующих метаданных).

Наблюдения

  1. Энтропия эффективно ограничивает производительность самого сильного сжатия без потерь - Википедия - Энтропия (теория информации) .
  2. Оба постоянные столбчатые форматы , а также внутреннее представление Spark SQL прозрачно применяют различные методы сжатия (например, Кодировка по длине прогона или * 1025). * словарная кодировка ) для уменьшения объема памяти хранимых данных.

    Кроме того, форматы дисков (включая текстовые данные) могут быть явно сжаты с использованием алгоритмов сжатия общего назначения - неясно, так ли это здесь.

  3. Сжатие (явное или прозрачное) применяется к блокам данных (обычно это разделы, но можно использовать меньшие единицы).

  4. На основании 1), 2) и 3) можно предположить, что средняя степень сжатия будет зависеть от распределения данных в кластере. Также следует отметить, что конечный результат может быть недетерминированным, если линия вверх по течению содержит широкие преобразования.

Возможное воздействие coalesce против repartition:

В общем случае coalesce могут принимать два пути:

  • Эскалация по конвейеру до источника - самый распространенный сценарий.
  • Распространение до ближайшего шаффла.

В первом случае мы можем ожидать, что степень сжатия будет сопоставима со степенью сжатия ввода. Однако есть некоторые случаи, когда можно добиться гораздо меньшего конечного результата. Давайте представим вырожденный набор данных:

val df = sc.parallelize(
  Seq("foo", "foo", "foo", "bar", "bar", "bar"),
  6 
).toDF

Если такой набор данных был записан на диск, не было бы возможности для сжатия - каждое значение должно быть записано как есть:

df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  1|
|  foo|  2|
|  bar|  3|
|  bar|  4|
|  bar|  5|
+-----+---+

Другими словами, нам нужно примерно 6 * 3 байта, что дает всего 18 байтов.

Однако, если мы объединимся

df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  0|
|  foo|  0|
|  bar|  1|
|  bar|  1|
|  bar|  1|
+-----+---+

мы можем, например, применить RLE с маленьким int в качестве счетчика и сохранить каждый раздел 3 + 1 байт, что дает всего 8 байт.

Это, конечно, огромное упрощение, но показывает, как сохранение структуры входов с низкой энтропией и объединение блоков может привести к уменьшению объема используемой памяти.

Второй coalesce сценарий менее очевиден, но есть сценарии, в которых энтропия может быть уменьшена с помощью восходящего процесса (подумайте, например, о оконных функциях), и сохранение такой структуры будет полезным.

А как же repartition?

Без выражения разделения repartition применяется RoundRobinPartitioning (реализовано как HashPartitioning с псевдослучайным ключом на основе идентификатора раздела). Пока хеш-функция ведет себя разумно, такое перераспределение должно максимизировать энтропию данных и, как следствие, снизить возможную степень сжатия.

Заключение

coalesce не должен обеспечивать какие-либо конкретные преимущества в одиночку, но может сохранять существующие свойства распределения данных - в некоторых случаях это свойство может быть полезным.

repartition, в силу своей природы, усугубляет ситуацию, если энтропия данных уже не максимизирована (сценарий, в котором улучшения возможны, но крайне маловероятен для нетривиального набора данных).

Наконец, repartition с выражением разделения или repartitionByRange должно уменьшить энтропию и улучшить степень сжатия.

Примечание

Следует также помнить, что столбчатые форматы обычно выбирают конкретный метод сжатия / кодирования (или его отсутствие) на основе статистики времени выполнения. Таким образом, даже если набор строк в определенном блоке является фиксированным, но порядок строк изменяется, мы можем наблюдать различные результаты.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...