Список фреймов данных в RDD в отдельном разделе - PullRequest
0 голосов
/ 27 июня 2018

У меня есть список искровых фреймов данных, и я должен выполнить с ними некоторую операцию Я хочу создать rdd из этого так, чтобы каждый фрейм данных помещался в отдельный раздел, чтобы я мог просто использовать mapPartitions на этом rdd для выполнения вычислений на каждом фрейме данных параллельно на отдельных узлах.

1 Ответ

0 голосов
/ 27 июня 2018

Ниже приведен код для достижения этой цели. Общий подход состоит в том, чтобы просто объединить все ваши данные и добавить столбец source, чтобы пометить, откуда пришла каждая строка. Вызов объединения не должен изменять разбиение DataFrames , просто объединить все разделы в один uber DataFrame . Если у вас есть что-то, что вызывает перестановки, вы можете добавить столбец с исходным идентификатором раздела, используя spark_partition_id(), а затем вызвать repartition для столбцов source и partition_id.

from pyspark.sql.functions import struct, lit, col

df1 = sc.parallelize([
    (1, 2, 3),
    (2, 3, 4)
]).toDF(["col1", "col2", "col3"])

df2 = sc.parallelize([
    (3, 4, 5),
    (4, 5, 6)
]).toDF(["col1", "col2", "col3"])

# Setup the DF's for union.  Their columns need to be in the same order and
# add a source column
df1_union = df1.select(lit("df1").alias("source"), *[col(c) for c in sorted(df1.columns)])
df2_union = df2.select(lit("df2").alias("source"), *[col(c) for c in sorted(df2.columns)])

# You could do this instead if the schemas are different
# df1_union = df1.select(lit("df1").alias("source"), struct(*df1.columns).alias("df1"), lit(None).alias("df2"))
# df2_union = df2.select(lit("df2").alias("source"), lit(None).alias("df1"), struct(*df2.columns).alias("df2"))

combined = df1_union.unionAll(df2_union) 

combined.show()
combined.rdd.mapPartitions(lambda row: do whatever..)

Обратите внимание, вот как выглядят объединенные данные:

+------+----+----+----+
|source|col1|col2|col3|
+------+----+----+----+
|   df1|   1|   2|   3|
|   df1|   2|   3|   4|
|   df2|   3|   4|   5|
|   df2|   4|   5|   6|
+------+----+----+----+
...