Spark-агрегаты, в которых выходные столбцы являются функциями, а строки - столбцами. - PullRequest
2 голосов
/ 21 февраля 2020

Я хочу вычислить кучу различных функций agg для разных столбцов в кадре данных.

Я знаю, что могу сделать что-то подобное, но вывод - одна строка.

df.agg(max("cola"), min("cola"), max("colb"), min("colb"))

Допустим, я буду выполнять 100 различных агрегаций для 10 различных столбцов.

Я хочу, чтобы выходной кадр данных был таким -

      |Min|Max|AnotherAggFunction1|AnotherAggFunction2|...etc..
cola  | 1 | 10| ... 
colb  | 2 | NULL| ... 
colc  | 5 | 20| ... 
cold  | NULL | 42| ... 
...

Где мой строки - это каждый столбец, для которого я выполняю агрегацию, а мои столбцы - функции агрегации. Например, некоторые области будут нулевыми, если я не вычислю colb макс.

Как я могу выполнить sh это?

Ответы [ 2 ]

2 голосов
/ 22 февраля 2020

Вы можете создать столбец Map, скажем, Metrics, где ключи - это имена столбцов, а значения - структура агрегаций (max, min, avg, ...). Я использую функцию map_from_entries для создания столбца карты (доступно из Spark 2.4+). А затем просто взорвите карту, чтобы получить желаемую структуру.

Вот пример, который вы можете адаптировать под свои требования:

df = spark.createDataFrame([("A", 1, 2), ("B", 2, 4), ("C", 5, 6), ("D", 6, 8)], ['cola', 'colb', 'colc'])

agg = map_from_entries(array(
    *[
        struct(lit(c),
               struct(max(c).alias("Max"), min(c).alias("Min"))
               )
        for c in df.columns
    ])).alias("Metrics")

df.agg(agg).select(explode("Metrics").alias("col", "Metrics")) \
    .select("col", "Metrics.*") \
    .show()

#+----+---+---+
#|col |Max|Min|
#+----+---+---+
#|cola|D  |A  |
#|colb|6  |1  |
#|colc|8  |2  |
#+----+---+---+
1 голос
/ 23 февраля 2020

Вот одно решение, которое позволяет вам динамически устанавливать агрегаты из заранее определенного списка. Решение использует map_from_arrays и другие, поэтому совместимо с Spark> = 2.4.0:

from pyspark.sql.functions import lit, expr, array, map_from_arrays

df = spark.createDataFrame([
  [1, 2.3, 5000],
  [2, 5.3, 4000],
  [3, 2.1, 3000],
  [4, 1.5, 4500]
], ["cola", "colb", "colc"])

aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]

agg_keys = []
agg_values = []

# generate map here where key is col name and value an array of aggregations
for c in df.columns:
  agg_keys.append(lit(c)) # the key i.e cola
  agg_values.append(array(*[expr(f"{a}({c})") for a in aggs])) # the value i.e [expr("min(a)"), expr("max(a)"), expr("avg(a)"), expr("sum(a)")]

df.agg(
  map_from_arrays(
    array(agg_keys), 
    array(agg_values)
  ).alias("aggs")
) \
.select(explode("aggs")) \
.selectExpr("key as col", *aggs_select_expr) \
.show(10, False)

# +----+------+------+------+-------+
# |col |min   |max   |avg   |sum    |
# +----+------+------+------+-------+
# |cola|1.0   |4.0   |2.5   |10.0   |
# |colb|1.5   |5.3   |2.8   |11.2   |
# |colc|3000.0|5000.0|4125.0|16500.0|
# +----+------+------+------+-------+

Описание: с выражением array(*[expr(f"{a}({c})") for a in aggs]) мы создаем массив, который содержит все агрегаты текущего столбца. Каждый элемент сгенерированного массива оценивается с помощью оператора expr(f"{a}({c})", который будет производить, т.е. expr("min(a)").

Массив будет состоять из значений agg_values, которые вместе с agg_keys составят нашу окончательную карту через выражение map_from_arrays(array(agg_keys), array(agg_values)). Вот как выглядит структура карты:

map(
    cola -> [min(cola), max(cola), avg(cola), sum(cola)]
    colb -> [min(colb), max(colb), avg(colb), sum(colb)]
    colc -> [min(cola), max(colc), avg(cola), sum(colc)]
)

Чтобы извлечь необходимую нам информацию, мы должны взорвать предыдущую карту с помощью explode("aggs"), это создаст два столбца key и value который мы используем в нашем операторе select.

aggs_select_expr будет содержать значения в виде ["value[0] as min", "value[1] as max", "value[2] as avg", "value[3] as sum"], которые будут входными данными для selectExpr statememnt.

UPDATE:

Я понял, что есть более эффективный способ, опуская агрегацию, то есть неявную от groupBy до agg. Мы можем добиться того же с помощью встроенной функции create_map:

from pyspark.sql.functions import create_map, expr, array
from itertools import chain

df = spark.createDataFrame([
  [1, 2.3, 5000],
  [2, 5.3, 4000],
  [3, 2.1, 3000],
  [4, 1.5, 4500]
], ["cola", "colb", "colc"])

aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]

df.select(explode(
                  create_map(*list(
                        chain(*[(lit(c), array(*[expr(f"{a}({c})") for a in aggs])) 
                          for c in df.columns
                     ])))
                   )
         ) \
        .selectExpr("key as col", *aggs_select_expr)

Примечание : В дополнение к меньшему количеству кода, основным преимуществом второго подхода является тот факт, что он содержит только узкое преобразование и не широкое, т.е. groupBy. Что мы будем улучшать производительность, так как это позволит избежать перетасовки.

...