Вот способ вычислить все медианы параллельно (приблизительно, конечно).
Давайте начнем с сбора всех возможных ключей:
// generating data (BTW, please provide that code next time you ask a question)
val df = Seq(("aaaaaaaaaaaa", 11), ("aaaaaaaaaaaa", 44),
("bbbbbbbbbbbb", 22), ("aaaaaaaaaaaa", 33))
.toDF("A", "B")
val cols = df.select("A").distinct.collect.map(_.getAs[String](0))
Допустим, cols
имеетразмер N
. Один из способов сделать это - перебрать cols
и вычислить медиану через N отдельных заданий. Другой ответ - код для этого.
Тем не менее, можно вычислить все медианы параллельно (и, следовательно, только одну работу), используя сводную точку.
val precision = 1e-3
val medians = df
// the index is artificial, it is just meant to treat each line individually
.withColumn("index", monotonicallyIncreasingId)
.groupBy("index")
.pivot("A").agg(first('B))
.stat.approxQuantile(cols, Array(0.5), precision)
val result = cols.indices.map(i => cols(i) -> medians(i)(0)).toMap
Возможно, это не такСтоит только с несколькими ключами, может быть интересно, если у вас их больше.
EDIT Первое решение сохраняет одну строку на строку в исходном кадре данных и будет работать, я думаю, всевремя. Если у вас много ключей, было бы интересно вычислить индекс с помощью такого окна.
.withColumn("index", row_number() over Window.partitionBy("A").orderBy("B"))
Но не используйте окна, если у вас есть миллионы строк на ключ. Это может быть очень медленным или даже разбить вашу работу.