Spark несколько динамических агрегатных функций, countDistinct не работает - PullRequest
2 голосов
/ 11 апреля 2019

Агрегация на фрейме данных Spark с несколькими операциями динамического агрегирования.

Я хочу выполнить агрегирование на фрейме данных Spark с использованием Scala с несколькими операциями динамического агрегирования (передается пользователем в JSON). Я конвертирую JSON в Map.

Ниже приведен пример данных:

colA    colB    colC    colD
1       2       3       4
5       6       7       8
9       10      11      12

Код агрегации Spark, который я использую:

var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

Я должен передать aggFuncMap только как Map, чтобы пользователь мог передать любое количество агрегатов через конфигурацию JSON.

Приведенный выше код работает нормально для некоторых агрегатов, включая sum, min, max, avg и count.

Однако, к сожалению, этот код не работает для countDistinct (может, потому что это верблюжий случай?).

При запуске кода выше, я получаю эту ошибку:

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: неопределенная функция: 'countdistinct'. Эта функция не является ни зарегистрированной временной функцией, ни постоянной функцией, зарегистрированной в базе данных по умолчанию

Любая помощь будет оценена!

1 Ответ

2 голосов
/ 12 апреля 2019

В настоящее время невозможно использовать agg с countDistinct внутри Map.Из документации мы видим:

Доступны агрегированные методы: avg, max, min, sum, count.


A возможноИсправить можно было бы изменить Map на Seq[Column],

val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

, но это не очень поможет, если пользователь определит агрегаты в файле конфигурации.

Другой подход заключается в использовании expr, эта функция будет вычислять строку и возвращать столбец.Однако expr не примет "countDistinct", вместо этого необходимо использовать "count(distinct(...))".Это может быть закодировано следующим образом:

val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
...