У вас есть несколько вариантов.В моем коде ниже я предполагаю, что вы хотите писать в паркет, но, конечно, вы можете изменить это.
(1) df.repartition (numPartitions, * cols) .write.partitionBy (* cols).parquet (writePath)
Сначала будет использовано разбиение на основе хеша, чтобы гарантировать, что ограниченное число значений из COL попадет в каждый раздел.В зависимости от значения, выбранного для numPartitions
, некоторые разделы могут быть пустыми, в то время как другие могут быть переполнены значениями - для тех, кто не знает почему, прочитайте это .Затем, когда вы вызываете partitionBy
для DataFrameWriter, каждое уникальное значение в каждом разделе будет помещено в его отдельный файл.
Предупреждение: этот подход может привести к разным размерам разделов и выполнению односторонних задачtimes. Это происходит, когда значения в вашем столбце связаны со многими строками (например, столбец города - в файле для Нью-Йорка может быть много строк), тогда как другие значения менее многочисленны (например, значения для маленькихtowns).
(2) df.sort (sortCols) .write.parquet (writePath)
Эта опция прекрасно работает, когда вы хотите, чтобы (1) записываемые вами файлы были почти равныразмеры (2) точный контроль над количеством записанных файлов.Этот подход сначала глобально сортирует ваши данные, а затем находит разбиения, которые разбивают данные на k
разделы равномерного размера, где k
указано в конфигурации config spark.sql.shuffle.partitions
.Это означает, что все значения с одинаковыми значениями вашего ключа сортировки смежны друг с другом, но иногда они будут разбиваться на части и находиться в разных файлах.Если ваш вариант использования требует, чтобы все строки с одинаковым ключом были в одном разделе, не используйте этот подход.
Существует два дополнительных бонуса: (1) сортировка данных по их размеруна диске часто можно уменьшить (например, сортировка всех событий по user_id, а затем по времени приведет к большому количеству повторений в значениях столбцов, что способствует сжатию) и (2) если вы записываете в формат файла поддерживающий его (например, Parquet)затем последующие читатели могут оптимально считывать данные с помощью предиката push-down, потому что средство записи паркета запишет значения MAX и MIN каждого столбца в метаданных, что позволяет читателю пропускать строки, если запрос указывает значения за пределами раздела (min, max) range.
Обратите внимание, что сортировка в Spark обходится дороже, чем просто перераспределение и требует дополнительного этапа.За кулисами Spark сначала определяет разбиения на одном этапе, а затем перетасовывает данные в эти разбиения на другом этапе.
(3) df.rdd.partitionBy (customPartitioner) .toDF (). Write.parquet(writePath)
Если вы используете spark в Scala, то вы можете написать пользовательский разделитель, который сможет справиться с досадными ошибками разделителя на основе хеша.К сожалению, не вариант в PySpark.Если вы действительно хотите написать пользовательский разделитель в pySpark, я обнаружил, что это возможно, хотя и немного неловко, используя rdd.repartitionAndSortWithinPartitions
:
df.rdd \
.keyBy(sort_key_function) \ # Convert to key-value pairs
.repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS,
partitionFunc=part_func) \
.values() # get rid of keys \
.toDF().write.parquet(writePath)
Может быть, кто-то другой знает более простой способ использованияпользовательский разделитель на фрейме данных в pyspark?