Вот одно решение, которое позволяет вам динамически устанавливать агрегаты из заранее определенного списка. Решение использует map_from_arrays и другие, поэтому совместимо с Spark> = 2.4.0:
from pyspark.sql.functions import lit, expr, array, map_from_arrays
df = spark.createDataFrame([
[1, 2.3, 5000],
[2, 5.3, 4000],
[3, 2.1, 3000],
[4, 1.5, 4500]
], ["cola", "colb", "colc"])
aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]
agg_keys = []
agg_values = []
# generate map here where key is col name and value an array of aggregations
for c in df.columns:
agg_keys.append(lit(c)) # the key i.e cola
agg_values.append(array(*[expr(f"{a}({c})") for a in aggs])) # the value i.e [expr("min(a)"), expr("max(a)"), expr("avg(a)"), expr("sum(a)")]
df.agg(
map_from_arrays(
array(agg_keys),
array(agg_values)
).alias("aggs")
) \
.select(explode("aggs")) \
.selectExpr("key as col", *aggs_select_expr) \
.show(10, False)
# +----+------+------+------+-------+
# |col |min |max |avg |sum |
# +----+------+------+------+-------+
# |cola|1.0 |4.0 |2.5 |10.0 |
# |colb|1.5 |5.3 |2.8 |11.2 |
# |colc|3000.0|5000.0|4125.0|16500.0|
# +----+------+------+------+-------+
Описание: с выражением array(*[expr(f"{a}({c})") for a in aggs])
мы создаем массив, который содержит все агрегаты текущего столбца. Каждый элемент сгенерированного массива оценивается с помощью оператора expr(f"{a}({c})"
, который будет производить, т.е. expr("min(a)")
.
Массив будет состоять из значений agg_values
, которые вместе с agg_keys
составят нашу окончательную карту через выражение map_from_arrays(array(agg_keys), array(agg_values))
. Вот как выглядит структура карты:
map(
cola -> [min(cola), max(cola), avg(cola), sum(cola)]
colb -> [min(colb), max(colb), avg(colb), sum(colb)]
colc -> [min(cola), max(colc), avg(cola), sum(colc)]
)
Чтобы извлечь необходимую нам информацию, мы должны взорвать предыдущую карту с помощью explode("aggs")
, это создаст два столбца key
и value
который мы используем в нашем операторе select.
aggs_select_expr
будет содержать значения в виде ["value[0] as min", "value[1] as max", "value[2] as avg", "value[3] as sum"]
, которые будут входными данными для selectExpr
statememnt.
UPDATE:
Я понял, что есть более эффективный способ, опуская агрегацию, то есть неявную от groupBy
до agg
. Мы можем добиться того же с помощью встроенной функции create_map
:
from pyspark.sql.functions import create_map, expr, array
from itertools import chain
df = spark.createDataFrame([
[1, 2.3, 5000],
[2, 5.3, 4000],
[3, 2.1, 3000],
[4, 1.5, 4500]
], ["cola", "colb", "colc"])
aggs = ["min", "max", "avg", "sum"]
aggs_select_expr = [f"value[{idx}] as {agg}" for idx, agg in enumerate(aggs)]
df.select(explode(
create_map(*list(
chain(*[(lit(c), array(*[expr(f"{a}({c})") for a in aggs]))
for c in df.columns
])))
)
) \
.selectExpr("key as col", *aggs_select_expr)
Примечание : В дополнение к меньшему количеству кода, основным преимуществом второго подхода является тот факт, что он содержит только узкое преобразование и не широкое, т.е. groupBy
. Что мы будем улучшать производительность, так как это позволит избежать перетасовки.