Ниже приведен код для достижения этой цели. Общий подход состоит в том, чтобы просто объединить все ваши данные и добавить столбец source
, чтобы пометить, откуда пришла каждая строка. Вызов объединения не должен изменять разбиение DataFrames , просто объединить все разделы в один uber DataFrame . Если у вас есть что-то, что вызывает перестановки, вы можете добавить столбец с исходным идентификатором раздела, используя spark_partition_id()
, а затем вызвать repartition
для столбцов source
и partition_id
.
from pyspark.sql.functions import struct, lit, col
df1 = sc.parallelize([
(1, 2, 3),
(2, 3, 4)
]).toDF(["col1", "col2", "col3"])
df2 = sc.parallelize([
(3, 4, 5),
(4, 5, 6)
]).toDF(["col1", "col2", "col3"])
# Setup the DF's for union. Their columns need to be in the same order and
# add a source column
df1_union = df1.select(lit("df1").alias("source"), *[col(c) for c in sorted(df1.columns)])
df2_union = df2.select(lit("df2").alias("source"), *[col(c) for c in sorted(df2.columns)])
# You could do this instead if the schemas are different
# df1_union = df1.select(lit("df1").alias("source"), struct(*df1.columns).alias("df1"), lit(None).alias("df2"))
# df2_union = df2.select(lit("df2").alias("source"), lit(None).alias("df1"), struct(*df2.columns).alias("df2"))
combined = df1_union.unionAll(df2_union)
combined.show()
combined.rdd.mapPartitions(lambda row: do whatever..)
Обратите внимание, вот как выглядят объединенные данные:
+------+----+----+----+
|source|col1|col2|col3|
+------+----+----+----+
| df1| 1| 2| 3|
| df1| 2| 3| 4|
| df2| 3| 4| 5|
| df2| 4| 5| 6|
+------+----+----+----+