проблема с фреймом pyspark cache () - PullRequest
0 голосов
/ 18 февраля 2020

У меня есть программа, написанная для распараллеливания процесса, кэш был применен после определенных преобразований в датафреймах. Скажем так:

df1 = df.filter()
df3 = df1.join(df2, join_cond, "left")
df3.cache()    #ex: it has col1, col2, col3, col4 columns

After cache, we have some other steps to take care:
#1
df4 = df3.select(df3.col1, df3.col2)
df4.filter(df3.col1 > 500).show()
#2
df5 = df3.select(df3.col3, df3.col4)
df5.filter(df3.col4 > 2000)

df3.unpersist()

Итак, в этом процессе, если возникнет какая-либо проблема или ошибка, нам придется разархивировать фрейм данных, или старый кэш будет уничтожен автоматически при повторном запуске программы.

Может Пожалуйста, помогите мне, как будет работать cache () , если в какой-то момент времени в программе возникнут сбои.

Спасибо

1 Ответ

0 голосов
/ 20 февраля 2020

кэш сохраняет результаты ленивых вычислений в памяти, поэтому после кэша любое преобразование может быть выполнено непосредственно из сканирования df в памяти и начать работу.

действие против преобразования, действие приводит к не-случайному df объект, как в вашем коде .show(), преобразование приводит к другому rdd / spark df, как в вашем коде .filter, .select, .join

только на основе вашего фрагмента кода, нет проблема, зависимость вашего df4 просто сканируется df3 -> df4 и есть только одно действие. Но если вы захотите позвонить df5.filter().show() или df4.show() снова, это станет проблемой. Поскольку вы unpersist df3, в памяти нет данных, чтобы восстановить df4, приложение spark должно запускаться с df1 -> df2 -> df3 -> df4.

Unpersist нарушает ваш код? нет, но определенно влияет на производительность вашего приложения. Я дважды проверим, не нужен ли постоянный df в дальнейшей работе, а затем отменю тест

...