Переупорядочение StrucType и вложенных ArrayTypes - PullRequest
0 голосов
/ 03 мая 2018

У меня есть датафрейм со схемой:

root
 |-- col2: integer (nullable = true)
 |-- col1: integer (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- structField2: boolean (nullable = true)
 |    |-- structField1: string (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- nestedArray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- elem3: double (nullable = true)
 |    |    |    |-- elem2: string (nullable = true)
 |    |    |    |-- elem1: string (nullable = true)
 |    |-- structField2: integer (nullable = true)

и из-за проблем с совместимостью я пытаюсь вывести его в формате паркета, но в следующем формате:

root
 |-- col1: integer (nullable = true) 
 |-- col2: integer (nullable = true)
 |-- structCol3: struct (nullable = true)
 |    |-- structField1: string (nullable = true)
 |    |-- structField2: boolean (nullable = true)
 |-- structCol4: struct (nullable = true)
 |    |-- nestedArray: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- elem1: string (nullable = true)
 |    |    |    |-- elem2: string (nullable = true)
 |    |    |    |-- elem3: double (nullable = true)
 |    |-- structField2: integer (nullable = true)

До сих пор я успешно переставлял столбцы и поля внутри структур следующим образом:

dfParquetOutput = df.select(
    "col1",
    "col2",
    struct(
        col("structCol3.structField1"), 
        col("structCol3.structField2")
    ).alias("structCol3"),
    struct(
        col("structCol4.nestedArray"),
        col("structCol4.structField2")
    ).alias("structCol4")
)

К сожалению, я изо всех сил пытаюсь найти способ переупорядочить элементы внутри StructType, который находится внутри массива. Я думал о том, чтобы попытаться использовать udf, но из-за того, что я новичок в зажигании, у меня не получилось добиться успеха. Я также пытался создать новый фрейм данных с предопределенной схемой, но в моих тестах столбцы назначались на основе позиции, а не имени.

Есть ли простой способ переупорядочить Struct внутри массива.

1 Ответ

0 голосов
/ 03 мая 2018

Вы действительно не можете избежать udf (или RDD) здесь. Если вы определяете данные как

from pyspark.sql.functions import udf, struct, col
from collections import namedtuple

Outer = namedtuple("Outer", ["structCol4"])
Inner = namedtuple("Inner", ["nestedArray", "structField2"])
Element = namedtuple("Element", ["col3", "col2", "col1"])

df = spark.createDataFrame([Outer(Inner([Element("3", "2", "1")], 1))])

Вы можете

@udf("array<struct<col1: string, col2: string, col3: string>>")
def reorder(arr):
    return [(col1, col2, col3) for col3, col2, col1 in arr]

result = df.withColumn(
    "structCol4", 
     struct(reorder("structCol4.nestedArray").alias("nestedArray"), col("structCol4.structField2")))

result.printSchema()
# root
#  |-- structCol4: struct (nullable = false)
#  |    |-- nestedArray: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- col1: string (nullable = true)
#  |    |    |    |-- col2: string (nullable = true)
#  |    |    |    |-- col3: string (nullable = true)
#  |    |-- structField2: long (nullable = true)
# 


result.show()
# +----------------+
# |      structCol4|
# +----------------+
# |[[[1, 2, 3]], 1]|
# +----------------+

С глубоко вложенными схемами у вас будет полное реструктурирование дерева внутри udf, но здесь это не требуется.

...