Агрегация на фрейме данных Spark с несколькими операциями динамического агрегирования.
Я хочу выполнить агрегирование на фрейме данных Spark с использованием Scala с несколькими операциями динамического агрегирования (передается пользователем в JSON). Я конвертирую JSON в Map
.
Ниже приведен пример данных:
colA colB colC colD
1 2 3 4
5 6 7 8
9 10 11 12
Код агрегации Spark, который я использую:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)
Я должен передать aggFuncMap
только как Map
, чтобы пользователь мог передать любое количество агрегатов через конфигурацию JSON.
Приведенный выше код работает нормально для некоторых агрегатов, включая sum
, min
, max
, avg
и count
.
Однако, к сожалению, этот код не работает для countDistinct
(может, потому что это верблюжий случай?).
При запуске кода выше, я получаю эту ошибку:
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: неопределенная функция: 'countdistinct'. Эта функция не является ни зарегистрированной временной функцией, ни постоянной функцией, зарегистрированной в базе данных по умолчанию
Любая помощь будет оценена!