Это, вероятно, проще всего сделать вне pyspark, и если данные, с которыми вы работаете, достаточно малы, это, вероятно, то, что вы должны сделать, потому что выполнение этого pyspark не будет особенно эффективным.
Если по какой-то причине вам нужно это сделать, это pyspark, вы можете сделать это с помощью нескольких преобразований данных. Первое, что нам нужно сделать, - это преобразовать все отдельные кадры данных в одну и ту же схему, что позволит нам итеративно выбирать из каждого и объединять в конечный результат. Ниже приведен один из способов достижения этого.
from pyspark.sql.functions import lit,col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
a = [[10,1,0]]
b = [[20,6]]
c = [[1,5,3,4]]
dfa = spark.createDataFrame(a,['col1','col2','col3'])
dfb = spark.createDataFrame(b,['col1','col2'])
dfc = spark.createDataFrame(c,['col1','col2','col3','col4'])
dfdict = {'dfa':dfa,'dfb':dfb,'dfc':dfc}
columns = set([col for dfname in dfdict for col in dfdict[dfname].columns])
for dfname in dfdict:
for colname in columns-set(dfdict[dfname].columns):
dfdict[dfname] = dfdict[dfname].withColumn(colname, lit(None).cast(StringType()))
schema = StructType([StructField("col_name", StringType(), True)]+\
[StructField("value_"+dfname, IntegerType(), True) for dfname in dfdict])
resultdf=spark.createDataFrame([],schema = schema)
for colname in columns:
resultdf = resultdf\
.union(dfdict['dfa'].select(lit(colname).alias('col_name'),
col(colname).alias('value_dfa'))\
.crossJoin(dfdict['dfb'].select(col(colname).alias('value_dfb')))\
.crossJoin(dfdict['dfc'].select(col(colname).alias('value_dfc'))))
resultdf.orderBy('col_name').show()
>>>
+--------+---------+---------+---------+
|col_name|value_dfa|value_dfb|value_dfc|
+--------+---------+---------+---------+
| col1| 10| 20| 1|
| col2| 1| 6| 5|
| col3| 0| null| 3|
| col4| null| null| 4|
+--------+---------+---------+---------+
Могут быть способы повысить эффективность этого, удаляя перекрестные соединения и заменяя их чем-то более умным.
Если вам нужно работать с начальными кадрами данных, которые имеют несколько строк, вам нужно объединить строки вместе (или изменить требования к ожидаемому результату). Например, вы можете суммировать все как в следующем примере.
from pyspark.sql.functions import sum
d = [[1,2,3],[4,5,6]]
dfd = spark.createDataFrame(a,['col1','col2','col3'])
dfdagg = dfd.groupby().agg(*[sum(col) for colname in dfa.columns])
Где dfdagg
теперь можно использовать так же, как и другие кадры данных, использованные выше.