когда нецелесообразно использовать persist () для фрейма данных искры? - PullRequest
0 голосов
/ 12 февраля 2019

Работая над улучшением производительности кода, поскольку многие задания не выполнялись (прервано), я думал об использовании функции persist() в Spark Dataframe всякий раз, когда мне нужно использовать этот же самый кадр данных во многих других операциях.Делая это и следуя заданиям, этапам в интерфейсе приложения Spark, я чувствовал, что это не всегда оптимально, это зависит от количества разделов и размера данных.Я не был уверен до тех пор, пока работа не была прервана из-за сбоя на этапе сохранения.

Я задаю вопрос , не рекомендуется ли использовать persist() при выполнении многих операций надатафрейм всегда действителен? Если нет, то когда это не так?как судить?

Чтобы быть более конкретным, я представлю свой код и детали прерванной работы:

#create a dataframe from another one df_transf_1 on which I made a lot of transformations but no actions
spark_df = df_transf_1.select('user_id', 'product_id').dropDuplicates()
#persist
spark_df.persist()
products_df = spark_df[['product_id']].distinct()
df_products_indexed = products_df.rdd.map(lambda r: r.product_id).zipWithIndex().toDF(['product_id', 'product_index'])

Вы можете спросить, почему я сохранил spark_df?Это потому, что я собираюсь использовать его несколько раз, как с products_df, а также в joins (например: spark_df = spark_df.join(df_products_indexed,"product_id")

Job stages

Подробная информация о причине сбоя на этапе 3:

Задание прервано из-за сбоя этапа: сбой задачи 40458 на этапе 3.0, последний сбой: сбой задачи 40458.3 на этапе 3.0 (TID 60778, xx.xx.yyyy.com, исполнитель 91): ExecutorLostFailure (исполнитель 91 завершился из-за одной из запущенных задач) Причина: Slave потерянная трассировка стека драйверов:

Размер входных данных ( 4 ТБ) огромен, перед тем как продолжить, есть ли способ проверить размер данных? Это параметр при выборе сохранения или нет? Также количество разделов (задач) для persist > 100 000

Ответы [ 2 ]

0 голосов
/ 11 мая 2019

Вот два случая использования persist():

  • После использования repartition, чтобы избежать повторного перемешивания ваших данных, поскольку кадр данных используется в следующих шагах,Это будет полезно только в том случае, если вы вызываете более одного действия для постоянного кадра данных / СДР, поскольку постоянное является преобразованием и, следовательно, лениво оценивается.В общем случае, если у вас есть несколько действий на одном и том же фрейме данных / СДР.

  • Итерационные вычисления, например, когда вы хотите запросить фрейм данных внутри цикла for.С persist Spark сохранит промежуточные результаты и пропустит повторную оценку одних и тех же операций при каждом вызове действия.Другим примером будет добавление новых столбцов с join, как обсуждено здесь .

0 голосов
/ 25 апреля 2019

Мой опыт научил меня тому, что вы должны сохранять фрейм данных, когда выполняете над ними несколько операций, поэтому вы создаете временные таблицы (также вы гарантируете, что в случае сбоя у вас будет точка восстановления).Этим вы предотвращаете огромные DAG'ы, которые часто не заканчиваются, если у вас есть, например, объединения.Поэтому мой совет будет сделать что-то вроде этого:

# operations
df.write.saveAsTable('database.tablename_temp')
df = spark.table('database.tablename_temp')
# more operations
...