Я пытаюсь настроить производительность spark, используя разделение на кадре данных spark.Вот код:
file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
.where((func.col("organization") == organization))
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()
Во время выполнения first()
я проверяю этапы работы в интерфейсе Spark и вот что я нахожу:
- Почему на этапе нет
repartition
шага? - Почему также есть этап 8? Я запросил только одно действие
first()
.Это из-за перестановки, вызванной repartition
? - Есть ли способ изменить перераспределение паркетных файлов без необходимости выполнять такие операции? Как изначально, когда я читаю
df
вы можете видеть, что он разделен на 43 тыс. разделов, что очень много (по сравнению с его размером, когда я сохраняю его в CSV-файл: 4 МБ с 13 тыс. строк) и создаю проблемы на следующих этапах, поэтому я хотелперераспределить его. - Должен ли я использовать
cache()
после перераспределения?df = df.repartition(10).cache()
?Как и при выполнении df.first()
во второй раз, я также получаю запланированный этап с 43k разделами, несмотря на то, что df.rdd.getNumPartitions()
вернул 10. EDIT: количество разделов просто для того, чтобы попробовать.мои вопросы направлены на то, чтобы помочь мне понять, как сделать правильное перераспределение.
Примечание: изначально Dataframe считывается из набора файлов паркета в Hadoop.
Я уже читал это Как работает раздел Spark (ING) для файлов в HDFS?