Как оптимально использовать одно и то же разделение окон над разными аналитическими функциями? - PullRequest
1 голос
/ 06 октября 2019

У меня вопрос по приведенному ниже коду. Я использую один и тот же раздел Windows и запускаю на нем разные функции, такие как max, min и row_number. Выполняет ли Spark разбиение окон каждый раз, когда я ссылаюсь на withColumn, или же оно оптимально выполняет разбиение окон один раз, когда сталкивается с несколькими функциями в одном разделе Windows?

val windows = Window.partitionBy("id").orderBy(col("date").desc)

dataframe.withColumn("max", max(col("column1")).over(windows))
         .withColumn("min", min(col("column2")).over(windows))
         .withColumn("row_number", row_number().over(windows))

1 Ответ

1 голос
/ 06 октября 2019

Spark выполнит все агрегации по одному и тому же окну в одно и то же время.

Мы можем подтвердить это, проверив физический план , подготовленный катализатором . Catalyst - это механизм оптимизации, который оптимизирует все операции, выполняемые с набором данных. Вы можете просмотреть физический план , позвонив explain(true) в наборе данных:

val df = List(("1", "2019-01-01", "100", "66"), ("2", "2019-01-02", "555", "444"))
           .toDF("id", "date", "column1", "column2")

val windows = Window.partitionBy("id").orderBy(col("date").desc)

dataframe.withColumn("max", max(col("column1")).over(windows))
     .withColumn("min", min(col("column2")).over(windows))
     .withColumn("row_number", row_number().over(windows))
     .explain(true)

В моем случае план выглядит так:

== Physical Plan ==
Window [max(column1#26) windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max#503, min(column2#27) windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min#510, row_number() windowspecdefinition(id#24, date#25 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#518], [id#24], [date#25 DESC NULLS LAST]
+- *(1) Sort [id#24 ASC NULLS FIRST, date#25 DESC NULLS LAST], false, 0
   +- Exchange hashpartitioning(id#24, 200)
      +- LocalTableScan [id#24, date#25, column1#26, column2#27]

Так как вы видите Sparkвыполнит следующие шаги:

  • сортировка всего набора данных
  • набор данных раздела по идентификатору
  • просмотр всех строк в наборе данных (сканирование) для выполнения агрегирования
...