Spark выполнит все агрегации по одному и тому же окну в одно и то же время.
Мы можем подтвердить это, проверив физический план , подготовленный катализатором . Catalyst - это механизм оптимизации, который оптимизирует все операции, выполняемые с набором данных. Вы можете просмотреть физический план , позвонив explain(true)
в наборе данных:
val df = List(("1", "2019-01-01", "100", "66"), ("2", "2019-01-02", "555", "444"))
.toDF("id", "date", "column1", "column2")
val windows = Window.partitionBy("id").orderBy(col("date").desc)
dataframe.withColumn("max", max(col("column1")).over(windows))
.withColumn("min", min(col("column2")).over(windows))
.withColumn("row_number", row_number().over(windows))
.explain(true)
В моем случае план выглядит так:
== Physical Plan ==
Window [max(column1#26) windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max#503, min(column2#27) windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min#510, row_number() windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#518], [id#24], [date#25 DESC NULLS LAST]
+- *(1) Sort [id#24 ASC NULLS FIRST, date#25 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(id#24, 200)
+- LocalTableScan [id#24, date#25, column1#26, column2#27]
Так как вы видите Sparkвыполнит следующие шаги:
- сортировка всего набора данных
- набор данных раздела по идентификатору
- просмотр всех строк в наборе данных (сканирование) для выполнения агрегирования