pyspark: эффективно разделите запись по тому же количеству разделов, что и исходная таблица - PullRequest
0 голосов
/ 09 июня 2018

У меня был вопрос, связанный с функцией repartitionBy() pyspark, который я первоначально разместил в комментарии к этому вопросу .Меня попросили опубликовать его как отдельный вопрос, поэтому вот он:

Я понимаю, что df.partitionBy(COL) запишет все строки с каждым значением COL в свою папку, и что каждая папка будет(при условии, что строки были ранее распределены по всем разделам с помощью какого-либо другого ключа), примерно столько же файлов, сколько было ранее во всей таблице.Я нахожу это поведение раздражающим.Если у меня есть большая таблица с 500 разделами, и я использую partitionBy(COL) в некоторых столбцах атрибутов, то теперь у меня есть, например, 100 папок, в которых каждая содержит 500 (теперь очень маленьких) файлов.

То, что я хотел бы, это поведение partitionBy(COL), но с примерно тем же размером и количеством файлов, что и у меня изначально.

В качестве демонстрации в предыдущем вопросе приводится игрушечный пример, где у вас есть таблица с10 разделов и сделайте partitionBy(dayOfWeek), и теперь у вас есть 70 файлов, потому что в каждой папке их 10.Я хотел бы ~ 10 файлов, по одному на каждый день, и, возможно, 2 или 3 на дни, которые имеют больше данных.

Можно ли это легко сделать?Что-то вроде df.write().repartition(COL).partitionBy(COL) может показаться работоспособным, но я беспокоюсь о том, что (в случае очень большой таблицы, которая собирается разбить на несколько папок) необходимо сначала объединить ее с небольшим количеством разделов перед делать partitionBy(COL) кажется плохой идеей.

Любые предложения приветствуются!

1 Ответ

0 голосов
/ 12 июня 2018

У вас есть несколько вариантов.В моем коде ниже я предполагаю, что вы хотите писать в паркет, но, конечно, вы можете изменить это.

(1) df.repartition (numPartitions, * cols) .write.partitionBy (* cols).parquet (writePath)

Сначала будет использовано разбиение на основе хеша, чтобы гарантировать, что ограниченное число значений из COL попадет в каждый раздел.В зависимости от значения, выбранного для numPartitions, некоторые разделы могут быть пустыми, в то время как другие могут быть переполнены значениями - для тех, кто не знает почему, прочитайте это .Затем, когда вы вызываете partitionBy для DataFrameWriter, каждое уникальное значение в каждом разделе будет помещено в его отдельный файл.

Предупреждение: этот подход может привести к разным размерам разделов и выполнению односторонних задачtimes. Это происходит, когда значения в вашем столбце связаны со многими строками (например, столбец города - в файле для Нью-Йорка может быть много строк), тогда как другие значения менее многочисленны (например, значения для маленькихtowns).

(2) df.sort (sortCols) .write.parquet (writePath)

Эта опция прекрасно работает, когда вы хотите, чтобы (1) записываемые вами файлы были почти равныразмеры (2) точный контроль над количеством записанных файлов.Этот подход сначала глобально сортирует ваши данные, а затем находит разбиения, которые разбивают данные на k разделы равномерного размера, где k указано в конфигурации config spark.sql.shuffle.partitions.Это означает, что все значения с одинаковыми значениями вашего ключа сортировки смежны друг с другом, но иногда они будут разбиваться на части и находиться в разных файлах.Если ваш вариант использования требует, чтобы все строки с одинаковым ключом были в одном разделе, не используйте этот подход.

Существует два дополнительных бонуса: (1) сортировка данных по их размеруна диске часто можно уменьшить (например, сортировка всех событий по user_id, а затем по времени приведет к большому количеству повторений в значениях столбцов, что способствует сжатию) и (2) если вы записываете в формат файла поддерживающий его (например, Parquet)затем последующие читатели могут оптимально считывать данные с помощью предиката push-down, потому что средство записи паркета запишет значения MAX и MIN каждого столбца в метаданных, что позволяет читателю пропускать строки, если запрос указывает значения за пределами раздела (min, max) range.

Обратите внимание, что сортировка в Spark обходится дороже, чем просто перераспределение и требует дополнительного этапа.За кулисами Spark сначала определяет разбиения на одном этапе, а затем перетасовывает данные в эти разбиения на другом этапе.

(3) df.rdd.partitionBy (customPartitioner) .toDF (). Write.parquet(writePath)

Если вы используете spark в Scala, то вы можете написать пользовательский разделитель, который сможет справиться с досадными ошибками разделителя на основе хеша.К сожалению, не вариант в PySpark.Если вы действительно хотите написать пользовательский разделитель в pySpark, я обнаружил, что это возможно, хотя и немного неловко, используя rdd.repartitionAndSortWithinPartitions:

df.rdd \
  .keyBy(sort_key_function) \  # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) \
  .values() # get rid of keys \
.toDF().write.parquet(writePath)

Может быть, кто-то другой знает более простой способ использованияпользовательский разделитель на фрейме данных в pyspark?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...