DataPrame Pyspark присоединяется неправильно, когда есть несколько вложенных полей - PullRequest
0 голосов
/ 03 сентября 2018

У меня есть фрейм данных с такой схемой:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

У меня есть новый фрейм данных, схема которого выглядит следующим образом:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = false)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)

Я хочу присоединиться к этим фреймам данных и иметь такую ​​структуру:

root
 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true) 
 |    |-- s5: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

Но, в свою очередь, я получаю фрейм данных после соединения следующим образом: корень

 |-- docId: string (nullable = true)
 |-- Country: struct (nullable = true)
 |    |-- s1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Country: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s1: string (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- s5: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- Gender: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s1: long (nullable = true)
 |    |-- s2: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s3: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- s4: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- YOB: struct (nullable = true)
 |    |-- s6: array (nullable = true)
 |    |    |-- element: long (containsNull = true)

Что должно быть сделано? Я сделал и внешнее соединение на поле docId и вышеупомянутый фрейм данных, который я получаю.

1 Ответ

0 голосов
/ 04 сентября 2018

Dataframe не 'неправильно соединен', так как операция JOIN не должна сортировать Structs. Вы получаете, казалось бы, повторяющиеся столбцы, потому что JOIN берет столбцы из обоих фреймов данных при объединении. Вы должны сделать комбинацию явно:

Инициализация

import pyspark
from pyspark.sql import types as T
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

Во-первых, данные (я добавил только некоторые столбцы для справки, расширение их до вашего полного примера тривиально):

Country_schema1 = T.StructField("Country", T.StructType([T.StructField("s1", T.StringType(), nullable=True)]), nullable=True)
Gender_schema1 = T.StructField("Gender", T.StructType([T.StructField("s1", T.StringType(), nullable=True),
                                                      T.StructField("s2", T.StringType(), nullable=True)]))
schema1 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
                       Country_schema1,
                       Gender_schema1
                       ])
data1 = [("1",["1"], ["M", "X"])]

df1 = spark.createDataFrame(data1, schema=schema1)
df1.toJSON().collect()

Выход:

['{"docId":"1","Country":{"s1":"1"},"Gender":{"s1":"M","s2":"X"}}']

Второй кадр данных:

Country_schema2 = T.StructField("Country", T.StructType([T.StructField("s6", T.StringType(), nullable=True)]), nullable=True)
Gender_schema2 = T.StructField("Gender", T.StructType([T.StructField("s6", T.StringType(), nullable=True),
                                                      T.StructField("s7", T.StringType(), nullable=True)]))
schema2 = T.StructType([T.StructField("docId", T.StringType(), nullable=True),
                       Country_schema2,
                       Gender_schema2
                       ])
data2 = [("1",["2"], ["F", "Z"])]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.toJSON().collect()

Выход:

['{"docId":"1","Country":{"s6":"2"},"Gender":{"s6":"F","s7":"Z"}}']

Теперь логика. Я думаю, что это проще, если сделать с помощью SQL. Сначала создайте таблицы:

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

Это запрос для выполнения. Это в основном указывает, какие поля должны быть SELECT ed (вместо всех), и оборачивает поля из StructField s в новую структуру, которая объединяет их:

result = spark.sql("SELECT df1.docID, "
                   "STRUCT(df1.Country.s1 AS s1, df2.Country.s6 AS s6) AS Country, "
                   "STRUCT(df1.Gender.s2 AS s2, df2.Gender.s6 AS s6, df2.Gender.s7 AS s7) AS Gender "
                   "FROM df1 JOIN df2 ON df1.docID=df2.docID")
result.show()

Выход:

+-----+-------+---------+
|docID|Country|   Gender|
+-----+-------+---------+
|    1| [1, 2]|[X, F, Z]|
+-----+-------+---------+

Лучше просматривать в формате JSON:

result.toJSON().collect()

['{"docID":"1","Country":{"s1":"1","s6":"2"},"Gender":{"s2":"X","s6":"F","s7":"Z"}}']
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...