Вместо использования окна вы можете добиться того же с помощью пользовательской агрегации в сочетании с перекрестным объединением:
import pyspark.sql.functions as F
from pyspark.sql.functions import broadcast
from itertools import chain
df = spark.createDataFrame([
[1, 2.3, 1],
[2, 5.3, 2],
[3, 2.1, 4],
[4, 1.5, 5]
], ["index", "col1", "col2"])
agg_cols = [(
F.min(c).alias("min_" + c),
F.max(c).alias("max_" + c),
F.mean(c).alias("mean_" + c))
for c in df.columns if c.startswith('col')]
stats_df = df.agg(*list(chain(*agg_cols)))
# there is no performance impact from crossJoin since we have only one row on the right table which we broadcast (most likely Spark will broadcast it anyway)
df.crossJoin(broadcast(stats_df)).show()
# +-----+----+----+--------+--------+---------+--------+--------+---------+
# |index|col1|col2|min_col1|max_col1|mean_col1|min_col2|max_col2|mean_col2|
# +-----+----+----+--------+--------+---------+--------+--------+---------+
# | 1| 2.3| 1| 1.5| 5.3| 2.8| 1| 5| 3.0|
# | 2| 5.3| 2| 1.5| 5.3| 2.8| 1| 5| 3.0|
# | 3| 2.1| 4| 1.5| 5.3| 2.8| 1| 5| 3.0|
# | 4| 1.5| 5| 1.5| 5.3| 2.8| 1| 5| 3.0|
# +-----+----+----+--------+--------+---------+--------+--------+---------+
Примечание 1: При использовании широковещания мы будем избегать тасования, так как широковещательный df будет быть отправлено всем исполнителям.
Примечание 2: с помощью chain(*agg_cols)
мы сводим список кортежей, который мы создали на предыдущем шаге.
ОБНОВЛЕНИЕ:
Вот план выполнения для вышеуказанной программы:
== Physical Plan ==
*(3) BroadcastNestedLoopJoin BuildRight, Cross
:- *(3) Scan ExistingRDD[index#196L,col1#197,col2#198L]
+- BroadcastExchange IdentityBroadcastMode, [id=#274]
+- *(2) HashAggregate(keys=[], functions=[finalmerge_min(merge min#233) AS min(col1#197)#202, finalmerge_max(merge max#235) AS max(col1#197)#204, finalmerge_avg(merge sum#238, count#239L) AS avg(col1#197)#206, finalmerge_min(merge min#241L) AS min(col2#198L)#208L, finalmerge_max(merge max#243L) AS max(col2#198L)#210L, finalmerge_avg(merge sum#246, count#247L) AS avg(col2#198L)#212])
+- Exchange SinglePartition, [id=#270]
+- *(1) HashAggregate(keys=[], functions=[partial_min(col1#197) AS min#233, partial_max(col1#197) AS max#235, partial_avg(col1#197) AS (sum#238, count#239L), partial_min(col2#198L) AS min#241L, partial_max(col2#198L) AS max#243L, partial_avg(col2#198L) AS (sum#246, count#247L)])
+- *(1) Project [col1#197, col2#198L]
+- *(1) Scan ExistingRDD[index#196L,col1#197,col2#198L]
Здесь мы видим BroadcastExchange
из SinglePartition
, который транслирует одну строку, начиная с stats_df
может вписаться в SinglePartition
. Поэтому данные, которые здесь перетасовываются, представляют собой только одну строку (минимально возможная).