Я предлагаю вам использовать метод partitionBy
из интерфейса DataFrameWriter
, встроенного в Spark ( docs ). Вот пример.
Учитывая df
DataFrame, идентификатор патрона должен быть одним или несколькими столбцами. В моем примере tmp_id
. Следующий фрагмент кода генерирует DF с 12 записями с 4 идентификаторами блоков.
import pyspark.sql.functions as F
df = spark.range(0, 12).withColumn("id_tmp", F.col("id") % 4).orderBy("id_tmp")
df.show()
Возвращает:
+---+------+
| id|id_tmp|
+---+------+
| 8| 0|
| 0| 0|
| 4| 0|
| 1| 1|
| 9| 1|
| 5| 1|
| 6| 2|
| 2| 2|
| 10| 2|
| 3| 3|
| 11| 3|
| 7| 3|
+---+------+
Для независимого сохранения каждого фрагмента вам необходимо:
(df
.repartition("id_tmp")
.write
.partitionBy("id_tmp")
.mode("overwrite")
.format("csv")
.save("output_folder"))
repartition
перемешает записи так, чтобы каждый узел имел полный набор записей для одного значения id_tmp. Затем каждый фрагмент записывается в один файл с partitionBy
.
Результирующая структура папок:
output_folder/
output_folder/._SUCCESS.crc
output_folder/id_tmp=0
output_folder/id_tmp=0/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=0/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=1
output_folder/id_tmp=1/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=1/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=2
output_folder/id_tmp=2/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=2/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=3
output_folder/id_tmp=3/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=3/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/_SUCCESS
Размер и количество разделов очень важны для производительности Spark. Не разбивайте набор данных слишком много и используйте файлы разумного размера (например, 1 ГБ на файл), особенно если вы используете службы облачного хранения. Также рекомендуется использовать переменные раздела, если вы хотите отфильтровать данные при загрузке (например: год = ГГГГ / месяц = ММ / день = ДД)