Dataframe не 'неправильно соединен', так как операция JOIN
не должна сортировать Structs. Вы получаете, казалось бы, повторяющиеся столбцы, потому что JOIN
берет столбцы из обоих фреймов данных при объединении. Вы должны сделать комбинацию явно:
Инициализация
import pyspark
from pyspark.sql import types as T
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
Во-первых, данные (я добавил только некоторые столбцы для справки, расширение их до вашего полного примера тривиально):
Country_schema1 = T.StructField("Country", T.StructType([T.StructField("s1", T.StringType(), nullable=True)]), nullable=True)
Gender_schema1 = T.StructField("Gender", T.StructType([T.StructField("s1", T.StringType(), nullable=True),
T.StructField("s2", T.StringType(), nullable=True)]))
schema1 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
Country_schema1,
Gender_schema1
])
data1 = [("1",["1"], ["M", "X"])]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.toJSON().collect()
Выход:
['{"docId":"1","Country":{"s1":"1"},"Gender":{"s1":"M","s2":"X"}}']
Второй кадр данных:
Country_schema2 = T.StructField("Country", T.StructType([T.StructField("s6", T.StringType(), nullable=True)]), nullable=True)
Gender_schema2 = T.StructField("Gender", T.StructType([T.StructField("s6", T.StringType(), nullable=True),
T.StructField("s7", T.StringType(), nullable=True)]))
schema2 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
Country_schema2,
Gender_schema2
])
data2 = [("1",["2"], ["F", "Z"])]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.toJSON().collect()
Выход:
['{"docId":"1","Country":{"s6":"2"},"Gender":{"s6":"F","s7":"Z"}}']
Теперь логика. Я думаю, что это проще, если сделать с помощью SQL. Сначала создайте таблицы:
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
Это запрос для выполнения. Это в основном указывает, какие поля должны быть SELECT
ed (вместо всех), и оборачивает поля из StructField
s в новую структуру, которая объединяет их:
result = spark.sql("SELECT df1.docID, "
"STRUCT(df1.Country.s1 AS s1, df2.Country.s6 AS s6) AS Country, "
"STRUCT(df1.Gender.s2 AS s2, df2.Gender.s6 AS s6, df2.Gender.s7 AS s7) AS Gender "
"FROM df1 JOIN df2 ON df1.docID=df2.docID")
result.show()
Выход:
+-----+-------+---------+
|docID|Country| Gender|
+-----+-------+---------+
| 1| [1, 2]|[X, F, Z]|
+-----+-------+---------+
Лучше просматривать в формате JSON:
result.toJSON().collect()
['{"docID":"1","Country":{"s1":"1","s6":"2"},"Gender":{"s2":"X","s6":"F","s7":"Z"}}']