Поворот и объединение фрейма данных PySpark с псевдонимом - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть PySpark DataFrame, похожий на этот:

df = sc.parallelize([
    ("c1", "A", 3.4, 0.4, 3.5), 
    ("c1", "B", 9.6, 0.0, 0.0),
    ("c1", "A", 2.8, 0.4, 0.3),
    ("c1", "B", 5.4, 0.2, 0.11),
    ("c2", "A", 0.0, 9.7, 0.3), 
    ("c2", "B", 9.6, 8.6, 0.1),
    ("c2", "A", 7.3, 9.1, 7.0),
    ("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", 'd2', 'd3'])
df.show()

, который дает:

+-------+----+---+---+----+
|user_id|type| d1| d2|  d3|
+-------+----+---+---+----+
|     c1|   A|3.4|0.4| 3.5|
|     c1|   B|9.6|0.0| 0.0|
|     c1|   A|2.8|0.4| 0.3|
|     c1|   B|5.4|0.2|0.11|
|     c2|   A|0.0|9.7| 0.3|
|     c2|   B|9.6|8.6| 0.1|
|     c2|   A|7.3|9.1| 7.0|
|     c2|   B|0.7|6.4| 4.3|
+-------+----+---+---+----+

И я повернул его на столбец type, агрегируя результат с sum():

data_wide = df.groupBy('user_id')\
.pivot('type').sum()
data_wide.show()

, что дает:

 +-------+-----------------+------------------+-----------+------------------+-----------+------------------+
|user_id|      A_sum(`d1`)|       A_sum(`d2`)|A_sum(`d3`)|       B_sum(`d1`)|B_sum(`d2`)|       B_sum(`d3`)|
+-------+-----------------+------------------+-----------+------------------+-----------+------------------+
|     c1|6.199999999999999|               0.8|        3.8|              15.0|        0.2|              0.11|
|     c2|              7.3|18.799999999999997|        7.3|10.299999999999999|       15.0|4.3999999999999995|
+-------+-----------------+------------------+-----------+------------------+-----------+------------------+

Теперь имена результирующих столбцов содержат символ `(тильда), и это проблема, например, для введения этогоновые столбцы в Vector Assembler, потому что он возвращает syntax error in attribute name.По этой причине мне нужно переименовать имена столбцов, но вызов метода withColumnRenamed внутри цикла или внутри функции reduce(lambda...) занимает много времени (на самом деле мой df имеет 11,520 столбцов).

Есть ли способ избежать этого символа в шаге агрегирования pivot + или рекурсивно назначить псевдоним, который зависит от имени нового столбца pivoted?

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 28 февраля 2019

Написал простую и быструю функцию для переименования сводных таблиц PySpark.Наслаждайтесь!:)

# This function efficiently rename pivot tables' urgly names
def rename_pivot_cols(rename_df, remove_agg):
    """change spark pivot table's default ugly column names at ease.
        Option 1: remove_agg = True: `2_sum(sum_amt)` --> `sum_amt_2`.
        Option 2: remove_agg = False: `2_sum(sum_amt)` --> `sum_sum_amt_2`
    """
    for column in rename_df.columns:
        if remove_agg == True:
            start_index = column.find('(')
            end_index = column.find(')')
            if (start_index > 0 and end_index > 0):
                rename_df = rename_df.withColumnRenamed(column, column[start_index+1:end_index]+'_'+column[:1])
        else:
            new_column = column.replace('(','_').replace(')','')
            rename_df = rename_df.withColumnRenamed(column, new_column[2:]+'_'+new_column[:1])   
    return rename_df
0 голосов
/ 10 декабря 2018

Вы можете сделать переименование в агрегации для pivot, используя alias:

import pyspark.sql.functions as f
data_wide = df.groupBy('user_id')\
    .pivot('type')\
    .agg(*[f.sum(x).alias(x) for x in df.columns if x not in {"user_id", "type"}])
data_wide.show()
#+-------+-----------------+------------------+----+------------------+----+------------------+
#|user_id|             A_d1|              A_d2|A_d3|              B_d1|B_d2|              B_d3|
#+-------+-----------------+------------------+----+------------------+----+------------------+
#|     c1|6.199999999999999|               0.8| 3.8|              15.0| 0.2|              0.11|
#|     c2|              7.3|18.799999999999997| 7.3|10.299999999999999|15.0|4.3999999999999995|
#+-------+-----------------+------------------+----+------------------+----+------------------+

Однако это действительно ничем не отличается отделать pivot и переименовывать потом.Вот план выполнения для этого метода:

#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), pivotfirst(type#1, sum(`d2`) 
#AS `d2`#170, A, B, 0, 0), pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
#   +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`) AS `d1`#169, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`) AS `d2`#170, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`) AS `d3`#171, A, B, 0, 0)])
#      +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
#         +- Exchange hashpartitioning(user_id#0, type#1, 200)
#            +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
#               +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]

Сравните это с методом из этого ответа :

import re

def clean_names(df):
    p = re.compile("^(\w+?)_([a-z]+)\((\w+)\)(?:\(\))?")
    return df.toDF(*[p.sub(r"\1_\3", c) for c in df.columns])

pivoted = df.groupBy('user_id').pivot('type').sum()
clean_names(pivoted).explain()
#== Physical Plan ==
#HashAggregate(keys=[user_id#0], functions=[pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
#+- Exchange hashpartitioning(user_id#0, 200)
#   +- HashAggregate(keys=[user_id#0], functions=[partial_pivotfirst(type#1, sum(`d1`)#363, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d2`)#364, A, B, 0, 0), partial_pivotfirst(type#1, sum(`d3`)#365, A, B, 0, 0)])
#      +- *HashAggregate(keys=[user_id#0, type#1], functions=[sum(d1#2), sum(d2#3), sum(d3#4)])
#         +- Exchange hashpartitioning(user_id#0, type#1, 200)
#            +- *HashAggregate(keys=[user_id#0, type#1], functions=[partial_sum(d1#2), partial_sum(d2#3), partial_sum(d3#4)])
#               +- Scan ExistingRDD[user_id#0,type#1,d1#2,d2#3,d3#4]

Вы увидите, что обапрактически идентичны.Вероятно, вы избежите небольшого ускорения, избегая регулярного выражения, но оно будет незначительным по сравнению с pivot.

...