искровая агрегация для столбца массива - PullRequest
0 голосов
/ 24 сентября 2018

У меня есть фрейм данных со столбцом массива.

val json = """[
{"id": 1, "value": [11, 12, 18]},
{"id": 2, "value": [23, 21, 29]}
]"""

val df = spark.read.json(Seq(json).toDS)

scala> df.show
+---+------------+
| id|       value|
+---+------------+
|  1|[11, 12, 18]|
|  2|[23, 21, 29]|
+---+------------+

Теперь мне нужно применить различные статистические функции к столбцу значения.Я могу позвонить explode и groupBy, например

df.select($"id", explode($"value").as("value")).groupBy($"id").agg(max("value"), avg("value")).show

+---+----------+------------------+
| id|max(value)|        avg(value)|
+---+----------+------------------+
|  1|        18|13.666666666666666|
|  2|        29|24.333333333333332|
+---+----------+------------------+

. Меня беспокоит то, что я разбил свой DataFrame на больший, а затем уменьшил его до исходного вызова groupBy.

Есть ли лучший (т.е. более эффективный) способ вызова агрегированных функций в столбце массива?Возможно, я могу реализовать UDF, но я не хочу сам реализовывать все UDF агрегирования.

РЕДАКТИРОВАТЬ.Кто-то ссылался на этот ТАК вопрос , но в моем случае он не работает.size работает нормально

scala> df.select($"id", size($"value")).show
+---+-----------+
| id|size(value)|
+---+-----------+
|  1|          3|
|  2|          3|
+---+-----------+

Но avg или max не работают.

1 Ответ

0 голосов
/ 24 сентября 2018

Краткий ответ - нет , вам нужно реализовать свой UDF для агрегирования по столбцу массива.По крайней мере, в последней версии Spark (2.3.1 на момент написания).Что, как вы правильно утверждаете, не очень эффективно, так как заставляет вас либо разбирать строки, либо платить за сериализацию и десерилизацию работы в API набора данных.

Для других, кто может найти этот вопрос, написать агрегаты вбезопасный для типов способ с наборами данных, вы можете использовать API Aggregator , который, по общему признанию, недостаточно хорошо документирован и с которым очень сложно работать, поскольку сигнатуры типов становятся довольно многословными.

Более длинный ответ заключается в том, что эта функциональность скоро появится (?) в Apache Spark 2.4.

Родительский выпуск SPARK-23899 добавляет:

  • array_max
  • array_min
  • агрегат
  • map
  • array_distinct
  • array_remove
  • array_join

и многие другие

Screencap slide 11 of Extending Spark SQL API with Easier to Use Array Types Operations

Этот доклад " Расширение Spark SQL API с помощью более простых в использовании операций типов массивов"был представлен на Spark + в июне 2018 годаAI Summit и охватывает новую функциональность.

Если бы она была выпущена, это позволило бы вам использовать функцию max, как в вашем примере, однако average немного сложнее.Как ни странно, array_sum отсутствует, но он может быть построен из функции aggregate.Вероятно, это будет выглядеть примерно так:

def sum_array(array_col: Column) = aggregate($"my_array_col", 0, (s, x) => s + x, s => s) df.select(sum_array($"my_array_col") Где нулевое значение - это начальное состояние буфера агрегата.

Как вы указали, size уже может получить длину массива., что означает, что можно было бы рассчитать среднее значение.

...