Запись DataFrame в Parquet или Delta не выглядит распараллеленной - слишком долго - PullRequest
2 голосов
/ 28 января 2020

Постановка проблемы

Я прочитал секционированный CSV-файл в Spark Dataframe.

Для того, чтобы использовать улучшения Delta Tables, которые я пытаюсь просто экспортируйте его как Delta в каталог внутри Azure Data Lake Storage Gen2. Я использую приведенный ниже код в блокноте Databricks:

%scala

df_nyc_taxi.write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

Весь фрейм данных имеет около 160 ГБ.

Спецификации аппаратного обеспечения

Я использую этот код с использованием кластера с 12 ядрами и 42 ГБ ОЗУ.

Однако выглядит как весь текст Процесс обрабатывается Spark / Databricks последовательно, например, непараллельно :

enter image description here

Визуализация DAG выглядит следующим образом :

enter image description here

В целом, на выполнение этой задачи уйдет 1-2 часа.

Вопросы

  • Есть ли способ заставить Spark выполнять запись в разные разделы параллельно?
  • Может быть, проблема в том, что я пытаюсь записать дельта-таблицу непосредственно в Azure Хранилище озера данных?

Ответы [ 2 ]

3 голосов
/ 03 февраля 2020

Чтобы прокомментировать комментарий @eliasah, вы можете попробовать это:

import org.apache.spark.sql.functions
df_nyc_taxi.repartition(col("year"), col("month"), lit(rand() * 200)).write.partitionBy("year", "month").format("delta").save("/mnt/delta/")

Ответ от @eliasah, скорее всего, создаст только один файл для каждого каталога "/ mnt / delta / year = XX / month = XX ", и только один работник запишет данные в каждый файл. Дополнительные столбцы будут дополнительно разделять данные (в этом случае я делю данные в каждом исходном файле на 200 меньших разделов, вы можете редактировать их, если хотите), так чтобы большее количество работников могло писать одновременно.

PS: извините, у меня пока недостаточно комментариев, чтобы комментировать: 'D

1 голос
/ 08 февраля 2020

Это похоже на другой ответ, однако я добавил персистент после перераспределения и перед его написанием. Persist будет go в памяти, а остаток (оставшийся после заполнения памяти) будет перетекать на диск, что все равно будет быстрее, чем повторное чтение. Это хорошо сработало в прошлом для меня. Я выбрал 1250 разделов, так как 128Мб это мой обычный go для размера раздела. Spark стал тем, чем является, из-за вычислений в памяти, поэтому лучше применять его всякий раз, когда у вас есть такая возможность.

from pyspark.sql import functions as F
df_nyc_taxi.repartition(1250,F.col("year"), col("month"))\
.persist(StorageLevel.MEMORY_AND_DISK).write.partitionBy("year", "month")\
.format("delta").save("/mnt/delta/")
...