Объедините несколько информационных фреймов в один в Pyspark [non pandas df] - PullRequest
2 голосов
/ 18 октября 2019

Я получу кадры данных, сгенерированные один за другим в процессе. Я должен объединить их в один.

+--------+----------+
|    Name|Age       |
+--------+----------+
|Alex    |        30|
+--------+----------+


+--------+----------+
|    Name|Age       |
+--------+----------+
|Earl    |        32|
+--------+----------+


+--------+----------+
|    Name|Age       |
+--------+----------+
|Jane    |        15|
+--------+----------+

Наконец:

+--------+----------+
|    Name|Age       |
+--------+----------+
|Alex    |        30|
+--------+----------+
|Earl    |        32|
+--------+----------+
|Jane    |        15|
+--------+----------+

Перепробовал много опций, таких как concat, merge, append, но я думаю, что это библиотеки панд. Я не использую панд. Использование версий python 2.7 и Spark 2.2

Отредактировано для покрытия окончательного сценария с foreachpartition: вывод

l = [('Alex', 30)]
k = [('Earl', 32)]

ldf = spark.createDataFrame(l, ('Name', 'Age'))
ldf = spark.createDataFrame(k, ('Name', 'Age'))
# option 1:
union_df(ldf).show()
#option 2:
uxdf = union_df(ldf)
uxdf.show()

в обоих случаях:

+-------+---+
|   Name|Age|
+-------+---+
|Earl   | 32|
+-------+---+

1 Ответ

1 голос
/ 18 октября 2019

Вы можете использовать unionAll() для фреймов данных:

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.union, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## |  k|   v|
## +---+----+
## |  1|foo1|
## |  2|bar1|
## |  3|foo2|
## |  4|bar2|
## |  5|foo3|
## |  6|bar3|
## +---+----+

РЕДАКТИРОВАТЬ:

Вы можете создать пустой фрейм данных и продолжать объединять его:

# Create first dataframe
ldf = spark.createDataFrame(l, ["Name", "Age"])
ldf.show()
# Save it's schema
schema = ldf.schema
# Create an empty DF with the same schema, (you need to provide schema to create empty dataframe)
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
empty_df.show()
# Union the first DF with the empty df
empty_df = empty_df.union(ldf)
empty_df.show()
# New dataframe after some operations
ldf = spark.createDataFrame(k, schema)
# Union with the empty_df again
empty_df = empty_df.union(ldf)
empty_df.show()
# First DF ldf
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
+----+---+

# Empty dataframe empty_df
+----+---+
|Name|Age|
+----+---+
+----+---+

# After first union empty_df.union(ldf)
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
+----+---+
# After second union with new ldf 
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
|Earl| 32|
+----+---+
...