Отказ
Этот ответ содержит в основном предположения. Детальное объяснение этого явления может потребовать углубленного анализа входных и выходных данных (или, по крайней мере, их соответствующих метаданных).
Наблюдения
- Энтропия эффективно ограничивает производительность самого сильного сжатия без потерь - Википедия - Энтропия (теория информации) .
Оба постоянные столбчатые форматы , а также внутреннее представление Spark SQL прозрачно применяют различные методы сжатия (например, Кодировка по длине прогона или * 1025). * словарная кодировка ) для уменьшения объема памяти хранимых данных.
Кроме того, форматы дисков (включая текстовые данные) могут быть явно сжаты с использованием алгоритмов сжатия общего назначения - неясно, так ли это здесь.
Сжатие (явное или прозрачное) применяется к блокам данных (обычно это разделы, но можно использовать меньшие единицы).
На основании 1), 2) и 3) можно предположить, что средняя степень сжатия будет зависеть от распределения данных в кластере. Также следует отметить, что конечный результат может быть недетерминированным, если линия вверх по течению содержит широкие преобразования.
Возможное воздействие coalesce
против repartition
:
В общем случае coalesce
могут принимать два пути:
- Эскалация по конвейеру до источника - самый распространенный сценарий.
- Распространение до ближайшего шаффла.
В первом случае мы можем ожидать, что степень сжатия будет сопоставима со степенью сжатия ввода. Однако есть некоторые случаи, когда можно добиться гораздо меньшего конечного результата. Давайте представим вырожденный набор данных:
val df = sc.parallelize(
Seq("foo", "foo", "foo", "bar", "bar", "bar"),
6
).toDF
Если такой набор данных был записан на диск, не было бы возможности для сжатия - каждое значение должно быть записано как есть:
df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 1|
| foo| 2|
| bar| 3|
| bar| 4|
| bar| 5|
+-----+---+
Другими словами, нам нужно примерно 6 * 3 байта, что дает всего 18 байтов.
Однако, если мы объединимся
df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 0|
| foo| 0|
| bar| 1|
| bar| 1|
| bar| 1|
+-----+---+
мы можем, например, применить RLE с маленьким int в качестве счетчика и сохранить каждый раздел 3 + 1 байт, что дает всего 8 байт.
Это, конечно, огромное упрощение, но показывает, как сохранение структуры входов с низкой энтропией и объединение блоков может привести к уменьшению объема используемой памяти.
Второй coalesce
сценарий менее очевиден, но есть сценарии, в которых энтропия может быть уменьшена с помощью восходящего процесса (подумайте, например, о оконных функциях), и сохранение такой структуры будет полезным.
А как же repartition
?
Без выражения разделения repartition
применяется RoundRobinPartitioning
(реализовано как HashPartitioning
с псевдослучайным ключом на основе идентификатора раздела). Пока хеш-функция ведет себя разумно, такое перераспределение должно максимизировать энтропию данных и, как следствие, снизить возможную степень сжатия.
Заключение
coalesce
не должен обеспечивать какие-либо конкретные преимущества в одиночку, но может сохранять существующие свойства распределения данных - в некоторых случаях это свойство может быть полезным.
repartition
, в силу своей природы, усугубляет ситуацию, если энтропия данных уже не максимизирована (сценарий, в котором улучшения возможны, но крайне маловероятен для нетривиального набора данных).
Наконец, repartition
с выражением разделения или repartitionByRange
должно уменьшить энтропию и улучшить степень сжатия.
Примечание
Следует также помнить, что столбчатые форматы обычно выбирают конкретный метод сжатия / кодирования (или его отсутствие) на основе статистики времени выполнения. Таким образом, даже если набор строк в определенном блоке является фиксированным, но порядок строк изменяется, мы можем наблюдать различные результаты.