Spark groupby несколько столбцов отдельно - PullRequest
0 голосов
/ 31 октября 2018

У меня есть фрейм данных Spark, как показано ниже, и я хочу выполнить на нем несколько агрегатных функций в разных столбцах независимо друг от друга и получить статистику по одному столбцу.

val df = (Seq((1, "a", "1"),
              (1,"b", "3"),
              (1,"c", "6"),
              (2, "a", "9"),
              (2,"c", "10"),
              (1,"b","8" ),
              (2, "c", "3"),
              (3,"r", "19")).toDF("col1", "col2", "col3"))

df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|   1|
|   1|   b|   3|
|   1|   c|   6|
|   2|   a|   9|
|   2|   c|  10|
|   1|   b|   8|
|   2|   c|   3|
|   3|   r|  19|
+----+----+----+

Я хочу сгруппировать по col1 и col2 и получить среднее значение столбца col3, чтобы получить следующий выходной кадр данных:

+----+----+----+---------+---------+
|col1|col2|col3|mean_col1|mean_col2|
+----+----+----+---------+---------+
|   1|   a|   1|      4.5|      5.0|
|   1|   b|   3|      4.5|      5.5|
|   1|   c|   6|      4.5|     6.33|
|   2|   a|   9|     7.33|      5.0|
|   2|   c|  10|     7.33|     6.33|
|   1|   b|   8|      4.5|      5.5|
|   2|   c|   3|     7.33|     6.33|
|   3|   r|  19|     19.0|     19.0|
+----+----+----+---------+---------+

Это можно сделать с помощью следующих операций:

val col1df = df.groupBy("col1").agg(round(mean("col3"),2).alias("mean_col1"))

val col2df = df.groupBy("col2").agg(round(mean("col3"),2).alias("mean_col2"))

df.join(col1df, "col1").join(col2df, "col2").select($"col1",$"col2",$"col3",$"mean_col1",$"mean_col2").show()

Однако, если у меня будет еще много столбцов для группировки, мне потребуется выполнить несколько дорогостоящих операций объединения. Более того, группировка по каждому столбцу перед выполнением объединения кажется довольно громоздкой. Каков наилучший способ получить выходной фрейм данных, сводя к минимуму (и предпочтительно исключая) операции соединения и не создавая фреймы данных col1df и col2df?

1 Ответ

0 голосов
/ 31 октября 2018

Если вы хотите, чтобы ваша итоговая таблица содержала все исходные строки, это можно сделать с помощью функции window .

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df = (Seq((1, "a", "1"),
    (1,"b", "3"),
    (1,"c", "6"),
    (2, "a", "9"),
    (2,"c", "10"),
    (1,"b","8" ),
    (2, "c", "3"),
    (3,"r", "19")).toDF("col1", "col2", "col3"))

  df.show(false)

  val col1Window = Window.partitionBy("col1").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
  val col2Window = Window.partitionBy("col2").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)


  val res = df
              .withColumn("mean_col1", round(mean("col3").over(col1Window), 2))
              .withColumn("mean_col2", round(mean("col3").over(col2Window), 2))

  res.show(false)

В контексте оконных функций partitionBy аналогичен groupBy, а rangeBetween определяет размер окна, в котором находятся все строки с одинаковым значением разделенных столбцов или это можно рассматривать как группу по столбцу.

...