Хотя это можно сделать с помощью groupBy
, оконные функции обычно проще, когда вам нужны все исходные строки во фрейме результирующих данных. Мы можем использовать collect_list
, но, как говорит документ , порядок является недетерминированным, поэтому давайте создадим кортежи томов и ключевых слов:
val txt =
"""Parent Keyword Volume
|P1 K1 100
|P1 K2 200
|P1 K3 150
|P2 K4 100
|P2 K5 200""".stripMargin.lines
.map(_.split("\\s+").mkString("|"))
.toSeq
.toDS()
val df = spark.read
.option("inferSchema", true)
.option("header", true)
.option("delimiter", "|")
.csv(txt)
val win = Window.partitionBy($"Parent")
val df1 =
df.select($"Keyword",
collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")
Теперь у нас почти есть нужный формат
df1.select(array_sort($"rel") as "Related_keywords")
.show(20, false)
Выход:
+------------------------------------+
|Related_keywords |
+------------------------------------+
|[[-200, K5], [-100, K4]] |
|[[-200, K5], [-100, K4]] |
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
+------------------------------------+
Однако есть две проблемы, оригинальная Keyword
будет дублирована в списке, и перед всеми ключевыми словами будет отрицательный объем. Чтобы сделать это красивее, я считаю, что нужны UDF: s (не удалось найти функцию SQL для распаковки кортежей):
val myudf = udf(
(keyword: String, rel: Seq[Row]) =>
rel
.collect {
case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
}
.sortBy(_._1)
.map(_._2))
df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
.show(20, false)
Выход:
+-------+----------------+
|Keyword|Related_keywords|
+-------+----------------+
|K4 |[K5] |
|K5 |[K4] |
|K1 |[K2, K3] |
|K2 |[K3, K1] |
|K3 |[K2, K1] |
+-------+----------------+