Агрегация Spark Window и производительность Group By / Join - PullRequest
2 голосов
/ 17 июня 2020

Меня интересуют характеристики производительности выполнения агрегатных функций над окном по сравнению с группировкой / объединением. В этом случае меня не интересуют оконные функции с настраиваемыми границами или порядком фреймов, а только как способ запуска агрегатных функций.

Обратите внимание, что меня интересует пакетная (не потоковая) производительность для приличного только объемы данных, поэтому я отключил широковещательные объединения для следующего.

Например, допустим, мы начинаем со следующего DataFrame:

val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
df.show(false)

+-----+---+
|name |age|
+-----+---+
|bob  |10 |
|sally|32 |
|mike |9  |
|bob  |18 |
+-----+---+

Допустим, мы хотим подсчитать количество появлений каждого имени, а затем укажите это количество в строках с совпадающим именем.

Группировать по / присоединиться

val joinResult = df.join(
    df.groupBy($"name").count,
    Seq("name"),
    "inner"
)
joinResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |18 |2    |
|bob  |10 |2    |
+-----+---+-----+

joinResult.explain
== Physical Plan ==
*(4) Project [name#5, age#6, count#12L]
+- *(4) SortMergeJoin [name#5], [name#15], Inner
   :- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#5, 200)
   :     +- LocalTableScan [name#5, age#6]
   +- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
      +- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
         +- Exchange hashpartitioning(name#15, 200)
            +- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
               +- LocalTableScan [name#15]

Окно

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}

val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
windowResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |10 |2    |
|bob  |18 |2    |
+-----+---+-----+

windowResult.explain
== Physical Plan ==
Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
+- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(name#5, 200)
      +- LocalTableScan [name#5, age#6]

На основе По планам выполнения похоже, что Windowing более эффективен (меньше этапов). Итак, мой вопрос: всегда ли это так - всегда ли мне использовать оконные функции для такого рода агрегации? Будут ли эти два метода масштабироваться одинаково по мере роста данных? А как насчет крайнего перекоса (т.е. некоторые имена встречаются намного чаще, чем другие)?

Ответы [ 2 ]

3 голосов
/ 17 июня 2020

Это зависит от данных. В частности, здесь это зависит от количества элементов столбца name. Если количество элементов мало, данные будут небольшими после агрегирования, и агрегированный результат можно будет транслировать в соединении. В этом случае соединение будет быстрее, чем window. С другой стороны, если количество элементов велико, а объем данных после агрегации велик, то соединение будет запланировано с использованием SortMergeJoin, использование window будет более эффективным.

В случае window у нас 1 тасовка всего + одна сортировка. В случае SortMergeJoin у нас есть то же самое в левой ветви (общее перемешивание + сортировка) плюс дополнительное сокращенное перемешивание и сортировка в правой ветви (под сокращением я подразумеваю, что данные агрегируются в первую очередь). В правой ветви соединения у нас также есть дополнительное сканирование данных.

Также вы можете проверить мое видео с Spark Summit, где я анализирую аналогичный пример.

1 голос
/ 17 июня 2020

Отключение трансляции по мере того, как вы заявляете, и генерация некоторых данных с временным подходом для 1M и 2M имен, генерируемых случайным образом, то есть приличного размера, время выполнения для плана 2 действительно лучше. 8, 8, 200 размеров разделов в кластере databricks (сообщество).

Сгенерированный план имеет умные функции для сортировки и подсчета через окно и, как вы говорите, меньше этапов. Похоже, это решающий аргумент. В масштабе у вас может быть больше разделов, но свидетельства склоняют меня к тому, чтобы приблизиться к 2.

Я пробовал случайные выборки имен (без учета возраста) и получил следующее: *

присоединиться за 48,361 секунды против 22,028 секунды для окна для 1M записей для .count

присоединиться за 85,814 секунд против 50,566 секунд для окна для 2M записей для .count после перезапуска кластера

соединение за 96,295 секунды против 43,875 секунды для окна для 2M записей для .count

Используемый код:

import scala.collection.mutable.ListBuffer
import scala.util.Random
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}

val alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
val size = alpha.size
def randStr(n:Int) = (1 to n).map(_ => alpha(Random.nextInt(size))).mkString

def timeIt[T](op: => T): Float = {
  val start = System.currentTimeMillis
  val res = op
  val end = System.currentTimeMillis
  (end - start) / 1000f
}

var names = new ListBuffer[String]()
for (i <- 1 to 2000000 ) {
    names += randStr(10)     
}
val namesList = names.toSeq
val df = namesList.toDF("name")

val joinResult = df.join(df.groupBy($"name").count, Seq("name"), "inner")
val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
val time1 = timeIt(joinResult.count)
val time2 = timeIt(windowResult.count)

println(s"join in $time1 seconds vs $time2 seconds for window")

Более того, этот вопрос еще демонстрирует незрелость Spark Optimizer.

...