Pyspark: как разбить фрейм данных на куски и сохранить их? - PullRequest
2 голосов
/ 04 августа 2020

Мне нужно разделить pyspark фрейм данных df и сохранить различные фрагменты.

Вот что я делаю: я определяю столбец id_tmp и разделяю фрейм данных на основе этого.

  chunk = 10000
  id1 = 0
  id2 = chunk
  df = df.withColumn('id_tmp', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
  c = df.count()
  while id1 < c:
    stop_df = df.filter( (tmp.id_tmp < id2) & (tmp.id_tmp >= id1))
    stop_df.write.format('com.databricks.spark.csv').save('myFolder/')
    id1+=chunk
    id2+=chunk

Есть ли более эффективный способ без определения столбца id_tmp

1 Ответ

0 голосов
/ 04 августа 2020

Я предлагаю вам использовать метод partitionBy из интерфейса DataFrameWriter, встроенного в Spark ( docs ). Вот пример.

Учитывая df DataFrame, идентификатор патрона должен быть одним или несколькими столбцами. В моем примере tmp_id. Следующий фрагмент кода генерирует DF с 12 записями с 4 идентификаторами блоков.

import pyspark.sql.functions as F
df = spark.range(0, 12).withColumn("id_tmp", F.col("id") % 4).orderBy("id_tmp")
df.show() 

Возвращает:

+---+------+
| id|id_tmp|
+---+------+
|  8|     0|
|  0|     0|
|  4|     0|
|  1|     1|
|  9|     1|
|  5|     1|
|  6|     2|
|  2|     2|
| 10|     2|
|  3|     3|
| 11|     3|
|  7|     3|
+---+------+

Для независимого сохранения каждого фрагмента вам необходимо:

(df
 .repartition("id_tmp")
 .write
 .partitionBy("id_tmp")
 .mode("overwrite")
 .format("csv")
 .save("output_folder"))

repartition перемешает записи так, чтобы каждый узел имел полный набор записей для одного значения id_tmp. Затем каждый фрагмент записывается в один файл с partitionBy.

Результирующая структура папок:

output_folder/
output_folder/._SUCCESS.crc
output_folder/id_tmp=0
output_folder/id_tmp=0/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=0/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=1
output_folder/id_tmp=1/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=1/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=2
output_folder/id_tmp=2/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=2/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=3
output_folder/id_tmp=3/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=3/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/_SUCCESS

Размер и количество разделов очень важны для производительности Spark. Не разбивайте набор данных слишком много и используйте файлы разумного размера (например, 1 ГБ на файл), особенно если вы используете службы облачного хранения. Также рекомендуется использовать переменные раздела, если вы хотите отфильтровать данные при загрузке (например: год = ГГГГ / месяц = ​​ММ / день = ДД)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...