Это происходит из-за внутренней работы Spark и ее ленивой оценки.
Что делает Spark, когда вы звоните groupby
, join
, agg
, она присоединяет эти вызовы к плану df
объект. Поэтому, несмотря на то, что он ничего не выполняет с данными, вы создаете большой план выполнения, который хранится внутри объекта Spark DataFrame.
Только при вызове действия (show
, count
, write
, et c.), Spark оптимизирует план и выполняет его. Если план слишком велик, выполнение шага оптимизации может занять некоторое время. Также помните, что оптимизация плана происходит на водителе, а не на исполнителях. Поэтому, если ваш драйвер занят или перегружен, он также задерживает шаг оптимизации плана зажигания.
Полезно помнить, что объединения - это дорогостоящие операции в Spark, как для оптимизации, так и для выполнения. Если вы можете, вы всегда должны избегать объединений при работе с одним DataFrame и использовать вместо этого функциональность окна. Объединения следует использовать только в том случае, если вы объединяете разные фреймы данных из разных источников (разных таблиц).
Способ оптимизации кода может быть следующим:
import pyspark
import pyspark.sql.functions as f
w = pyspark.sql.Window.partitionBy(list_group_features)
agg_sum_exprs = [f.sum(f.col(c)).alias("sum_" + c).over(w) for c in list_columns]
res_df = df.select(df.columns + agg_sum_exprs)
Это должно быть масштабируемым и быстрым для большие списки list_group_features
и list_columns
.