Pyspark - транспонировать несколько данных - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть несколько фреймов данных, которые выглядят следующим образом.

df1:

+---------+---------+---------+
|sum(col1)|sum(col2)|sum(col3)|
+---------+---------+---------+
|       10|        1|        0|
+---------+---------+---------+

df2:

+---------+---------+
|sum(col1)|sum(col2)|
+---------+---------+
|       20|        6|
+---------+---------+

df3:

+---------+---------+---------+---------+
|sum(col1)|sum(col2)|sum(col3)|sum(col4)|
+---------+---------+---------+---------+
|        1|        5|        3|        4|
+---------+---------+---------+---------+

Для приведенного выше примера вывод должен выглядеть следующим образом.

+--------+------+------+------+
|col_name|value1|value2|value3|
+--------+------+------+------+
|    col1|    10|    20|     1|
|    col2|     1|     6|     5|
|    col3|     0|  null|     3|
|    col4|  null|  null|     4|
+--------+------+------+------+

Я использую спарк 1.6.3 для этого. В приведенном выше примере у меня есть различные вычисления суммы для конкретной таблицы, но у меня есть несколько таблиц для расчета суммы для каждой таблицы, и выходные данные должны быть объединены в указанном формате.

Есть идеи, как этого добиться?

Ответы [ 2 ]

0 голосов
/ 07 ноября 2018

В качестве альтернативы, вы можете использовать функцию стека для транспонирования dfs, а затем объединить их

>>> df1x = df1.selectExpr("stack(3, 'col1', col1, 'col2', col2, 'col3', col3) as (col_name, value1)")
>>> df1x.show()
+--------+------+
|col_name|value1|
+--------+------+
|    col1|    10|
|    col2|     1|
|    col3|     0|
+--------+------+

>>> df2x = df2.selectExpr("stack(2, 'col1', col1, 'col2', col2) as (col_name, value2)")
>>> df2x.show()
+--------+------+
|col_name|value2|
+--------+------+
|    col1|    20|
|    col2|     6|
+--------+------+

>>> df3x = df3.selectExpr("stack(4, 'col1', col1, 'col2', col2, 'col3', col3, 'col4', col4) as (col_name, value3)")
>>> df3x.show()
+--------+------+
|col_name|value3|
+--------+------+
|    col1|     1|
|    col2|     5|
|    col3|     3|
|    col4|     4|
+--------+------+

>>> df1x.join(df2x, "col_name", "full").join(df3x, "col_name", "full").sort("col_name").show()
+--------+------+------+------+                                                 
|col_name|value1|value2|value3|
+--------+------+------+------+
|    col1|    10|    20|     1|
|    col2|     1|     6|     5|
|    col3|     0|  null|     3|
|    col4|  null|  null|     4|
+--------+------+------+------+
0 голосов
/ 07 ноября 2018

Это, вероятно, проще всего сделать вне pyspark, и если данные, с которыми вы работаете, достаточно малы, это, вероятно, то, что вы должны сделать, потому что выполнение этого pyspark не будет особенно эффективным.

Если по какой-то причине вам нужно это сделать, это pyspark, вы можете сделать это с помощью нескольких преобразований данных. Первое, что нам нужно сделать, - это преобразовать все отдельные кадры данных в одну и ту же схему, что позволит нам итеративно выбирать из каждого и объединять в конечный результат. Ниже приведен один из способов достижения этого.

from pyspark.sql.functions import lit,col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

a = [[10,1,0]]
b = [[20,6]]
c = [[1,5,3,4]]

dfa = spark.createDataFrame(a,['col1','col2','col3'])
dfb = spark.createDataFrame(b,['col1','col2'])
dfc = spark.createDataFrame(c,['col1','col2','col3','col4'])

dfdict = {'dfa':dfa,'dfb':dfb,'dfc':dfc}
columns = set([col for dfname in dfdict for col in dfdict[dfname].columns])

for dfname in dfdict:
    for colname in columns-set(dfdict[dfname].columns):
        dfdict[dfname] = dfdict[dfname].withColumn(colname, lit(None).cast(StringType()))

schema = StructType([StructField("col_name", StringType(), True)]+\
                    [StructField("value_"+dfname, IntegerType(), True) for dfname in dfdict])
resultdf=spark.createDataFrame([],schema = schema)

for colname in columns:
    resultdf = resultdf\
                .union(dfdict['dfa'].select(lit(colname).alias('col_name'),
                       col(colname).alias('value_dfa'))\
                .crossJoin(dfdict['dfb'].select(col(colname).alias('value_dfb')))\
                .crossJoin(dfdict['dfc'].select(col(colname).alias('value_dfc'))))

resultdf.orderBy('col_name').show()

>>>

+--------+---------+---------+---------+
|col_name|value_dfa|value_dfb|value_dfc|
+--------+---------+---------+---------+
|    col1|       10|       20|        1|
|    col2|        1|        6|        5|
|    col3|        0|     null|        3|
|    col4|     null|     null|        4|
+--------+---------+---------+---------+

Могут быть способы повысить эффективность этого, удаляя перекрестные соединения и заменяя их чем-то более умным.

Если вам нужно работать с начальными кадрами данных, которые имеют несколько строк, вам нужно объединить строки вместе (или изменить требования к ожидаемому результату). Например, вы можете суммировать все как в следующем примере.

from pyspark.sql.functions import sum

d = [[1,2,3],[4,5,6]]
dfd = spark.createDataFrame(a,['col1','col2','col3'])

dfdagg = dfd.groupby().agg(*[sum(col) for colname in dfa.columns])

Где dfdagg теперь можно использовать так же, как и другие кадры данных, использованные выше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...