Spark GroupBy и Aggregate Strings для создания карты подсчета строк на основе условия - PullRequest
0 голосов
/ 15 января 2020

У меня есть фрейм данных с двумя несколькими столбцами, два из которых являются id и меткой, как показано ниже.

+---+---+---+
| id| label|
+---+---+---+
|  1| "abc"|
|  1| "abc"|
|  1| "def"|
|  2| "def"|
|  2| "def"|
+---+---+---+

Я хочу, чтобы groupBy "id" и агрегировал столбец метки по количеству (игнорировать ноль) метки в структуре данных карты и ожидаемый результат, как показано ниже:

+---+---+--+--+--+--+--+--
| id| label             |
+---+-----+----+----+----+
|  1| {"abc":2, "def":1}|
|  2| {"def":2}         |
+---+-----+----+----+----+

Возможно ли это сделать без использования пользовательских агрегатных функций? Я видел аналогичный ответ здесь , но он не агрегируется на основе количества каждого элемента.

Я прошу прощения, если этот вопрос глуп, я новичок в Scala и Spark.

Спасибо

1 Ответ

1 голос
/ 16 января 2020

Без пользовательских UDF

import org.apache.spark.sql.functions.{map, collect_list}

df.groupBy("id", "label")
  .count
  .select($"id", map($"label", $"count").as("map"))
  .groupBy("id")
  .agg(collect_list("map"))
  .show(false)

+---+------------------------+                                                  
|id |collect_list(map)       |
+---+------------------------+
|1  |[[def -> 1], [abc -> 2]]|
|2  |[[def -> 2]]            |
+---+------------------------+

Использование пользовательских UDF,

import org.apache.spark.sql.functions.udf
val customUdf = udf((seq: Seq[String]) => {
  seq.groupBy(x => x).map(x => x._1 -> x._2.size)
})

df.groupBy("id")
  .agg(collect_list("label").as("list"))
  .select($"id", customUdf($"list").as("map"))
  .show(false)

+---+--------------------+
|id |map                 |
+---+--------------------+
|1  |[abc -> 2, def -> 1]|
|2  |[def -> 2]          |
+---+--------------------+
...