Как избежать переоценки каждого преобразования в фрейме данных pyspark снова и снова - PullRequest
1 голос
/ 30 октября 2019

У меня есть кадр данных искры. Я делаю несколько преобразований на фрейме данных. Мой код выглядит так:

df = df.withColumn ........
df2 = df.filter......
df = df.join(df1 ...
df = df.join(df2 ...

Теперь у меня около 30 + таких преобразований. Также я знаю о сохранении фрейма данных. Поэтому, если у меня есть некоторые преобразования, подобные этому:

df1 = df.filter.....some condition
df2 = df.filter.... some condtion
df3 = df.filter... some other conditon

Я сохраняю фрейм данных "df" в приведенном выше случае.

Теперь проблема в том, что искра занимает слишком много времени для запуска(8 + mts) или иногда это терпит неудачу с проблемой пространства кучи Java. Но после более чем 10 преобразований, если я сохраню в таблицу (постоянную таблицу кустов) и прочту из таблицы в следующей строке, потребуется около 3 + мтс для завершения. Он не работает, даже если я сохраню его в промежуточной таблице памяти. Размер кластера тоже не проблема.

# some transformations
df.write.mode("overwrite").saveAsTable("test")
df = spark.sql("select * from test")
# some transormations    ---------> 3 mts

# some transformations
df.createOrReplaceTempView("test")
df.count() #action statement for view to be created
df = spark.sql("select * from test")
# some more transformations  --------> 8 mts.    

Я посмотрел на план spark sql (до сих пор не совсем понимаю). Похоже, что spark снова и снова переоценивает один и тот же фрейм данных.

Что я делаю не так? Мне не нужно записывать его в промежуточную таблицу.

Редактировать: я работаю над лазурными блоками данных 5.3 (включает Apache Spark 2.4.0, Scala 2.11)

1 Ответ

1 голос
/ 06 ноября 2019

Вы должны использовать кэширование.

Попробуйте использовать

df.cache
df.count

Использование счетчика для принудительного кэширования всей информации.

Также я рекомендую вам взглянуть на это и это

...