Как я могу сгруппировать и получить среднее значение в Spark Dataframe - PullRequest
2 голосов
/ 28 октября 2019

В настоящее время у меня есть такой фрейм данных

+------------+----------+
|         A  |    B     |
+------------+----------+
|aaaaaaaaaaaa|11        |
|aaaaaaaaaaaa|44        |
|bbbbbbbbbbbb|22        |
|aaaaaaaaaaaa|33        |
+------------+----------+

Я хочу получить медиану значения столбца B в каждом столбце A.

+------------+----------+
|         A  |    B     |
+------------+----------+
|aaaaaaaaaaaa|33        |
|bbbbbbbbbbbb|22        |
+------------+----------+

Как я могу это сделать? Спасибо за ответ на вопрос.

Ответы [ 3 ]

1 голос
/ 28 октября 2019

Вы можете использовать UDF , groupBy и collect_list для достижения этого. Вот пример кода в Scala:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

def median[T: Numeric](xs: IndexedSeq[T]): Double = {
    if (xs.isEmpty) 0.0
    else {
      // There is faster algorithms (O(N), n-th order statistics) for finding percentiles,
      // but let's go with this one for simplicity - O(NlogN)
      val sorted = xs.sorted
      if (sorted.length % 2 == 1) implicitly[Numeric[T]].toDouble(sorted(sorted.length / 2))
      else {
        // [1, 2]
        val a = sorted(sorted.length / 2)
        val b = sorted(sorted.length / 2 - 1)
        implicitly[Numeric[T]].toDouble(implicitly[Numeric[T]].plus(a, b)) / 2
      }
    }
}

/// .... 

// This is important to make `toDF` visible!
import spark.sqlContext.implicits._

val medianUDF: UserDefinedFunction = udf[Double, IndexedSeq[Int]](median[Int])
val df: DataFrame = Seq(("aaaaaaaaaaaa", 11), ("aaaaaaaaaaaa", 44), ("bbbbbbbbbbbb", 22), ("aaaaaaaaaaaa", 33))
  .toDF("A", "B")
df.show()
//  +------------+---+
//  |           A|  B|
//  +------------+---+
//  |aaaaaaaaaaaa| 11|
//  |aaaaaaaaaaaa| 44|
//  |bbbbbbbbbbbb| 22|
//  |aaaaaaaaaaaa| 33|
//  +------------+---+

// Using UDF as aggregation function. Input for this UDF is indexed sequence - result from collect_list
df.groupBy(col("A"))
  .agg(medianUDF(collect_list(col("B"))).as("median"))
  .show()
//    +------------+------+
//    |           A|median|
//    +------------+------+
//    |bbbbbbbbbbbb|  22.0|
//    |aaaaaaaaaaaa|  33.0|
//    +------------+------+
0 голосов
/ 29 октября 2019

Вот способ вычислить все медианы параллельно (приблизительно, конечно).

Давайте начнем с сбора всех возможных ключей:

// generating data (BTW, please provide that code next time you ask a question)
val df = Seq(("aaaaaaaaaaaa", 11), ("aaaaaaaaaaaa", 44),
             ("bbbbbbbbbbbb", 22), ("aaaaaaaaaaaa", 33))
    .toDF("A", "B")

val cols = df.select("A").distinct.collect.map(_.getAs[String](0))

Допустим, cols имеетразмер N. Один из способов сделать это - перебрать cols и вычислить медиану через N отдельных заданий. Другой ответ - код для этого.

Тем не менее, можно вычислить все медианы параллельно (и, следовательно, только одну работу), используя сводную точку.

val precision = 1e-3
val medians = df
    // the index is artificial, it is just meant to treat each line individually
    .withColumn("index", monotonicallyIncreasingId)
    .groupBy("index")
    .pivot("A").agg(first('B))
    .stat.approxQuantile(cols, Array(0.5), precision)

val result = cols.indices.map(i => cols(i) -> medians(i)(0)).toMap

Возможно, это не такСтоит только с несколькими ключами, может быть интересно, если у вас их больше.

EDIT Первое решение сохраняет одну строку на строку в исходном кадре данных и будет работать, я думаю, всевремя. Если у вас много ключей, было бы интересно вычислить индекс с помощью такого окна.

.withColumn("index", row_number() over Window.partitionBy("A").orderBy("B"))

Но не используйте окна, если у вас есть миллионы строк на ключ. Это может быть очень медленным или даже разбить вашу работу.

0 голосов
/ 28 октября 2019

Приблизительный процентиль может быть решением:

val a = df.select("A").distinct.collect.flatMap(_.toSeq)
val quantile = a.map(valueOfA => (valueOfA, df.where(col("A").equalTo(valueOfA))))
      .map(df => (df._1, df._2.stat.approxQuantile("B", Array(0.5), 0.25)(0)))
quantile.foreach(println)

отпечатков

(bbbbbbbbbbbb,22.0)
(aaaaaaaaaaaa,33.0)

но это - как говорит название функции - только приближение

...