Вы можете сделать переименование в агрегации для pivot
, используя alias
:
import pyspark.sql.functions as f
data_wide = df.groupBy('user_id')\
.pivot('type')\
.agg(*[f.sum(x).alias(x) for x in df.columns if x not in {"user_id", "type"}])
data_wide.show()
#+-------+-----------------+------------------+----+------------------+----+------------------+
#|user_id| A_d1| A_d2|A_d3| B_d1|B_d2| B_d3|
#+-------+-----------------+------------------+----+------------------+----+------------------+
#| c1|6.199999999999999| 0.8| 3.8| 15.0| 0.2| 0.11|
#| c2| 7.3|18.799999999999997| 7.3|10.299999999999999|15.0|4.3999999999999995|
#+-------+-----------------+------------------+----+------------------+----+------------------+
Однако это действительно ничем не отличается отделать pivot
и переименовывать потом.Вот план выполнения для этого метода:
#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), pivotfirst(type#1, sum(`d2`)
#AS `d2`#170, A, B, 0, 0), pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
# +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`) AS `d2`#170, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
# +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
# +- Exchange hashpartitioning(user_id#0, type#1, 200)
# +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
# +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]
Сравните это с методом из этого ответа :
import re
def clean_names(df):
p = re.compile("^(\w+?)_([a-z]+)\((\w+)\)(?:\(\))?")
return df.toDF(*[p.sub(r"\1_\3", c) for c in df.columns])
pivoted = df.groupBy('user_id').pivot('type').sum()
clean_names(pivoted).explain()
#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
# +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
# +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
# +- Exchange hashpartitioning(user_id#0, type#1, 200)
# +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
# +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]
Вы увидите, что обапрактически идентичны.Вероятно, вы избежите небольшого ускорения, избегая регулярного выражения, но оно будет незначительным по сравнению с pivot
.