Как использовать to_json и from_json для устранения вложенных структурных полей в фрейме данных pyspark? - PullRequest
0 голосов
/ 05 октября 2019

Это решение в теории идеально работает для того, что мне нужно, а именно для создания новой скопированной версии фрейма данных, исключая при этом определенные вложенные структурные поля. Вот минимально воспроизводимый артефакт моей проблемы:

>>> df.printSchema()
root
| -- big: array(nullable=true)
| | -- element: struct(containsNull=true)
| | | -- keep: string(nullable=true)
| | | -- delete: string(nullable=true)

, экземпляр которого вы можете создать следующим образом:

schema = StructType([StructField("big", ArrayType(StructType([
    StructField("keep", StringType()),
    StructField("delete", StringType())
])))])
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

Моя цель - преобразовать фрейм данных (вместе со значениямив столбцах, которые я хочу сохранить) к тому, который исключает определенные вложенные структуры, например delete.

root
| -- big: array(nullable=true)
| | -- element: struct(containsNull=true)
| | | -- keep: string(nullable=true)

В соответствии с решением, которое я связал, которое пытается использовать pyspark.sql's to_json и from_json функций, это должно быть выполнено с чем-то вроде этого:

new_schema = StructType([StructField("big", ArrayType(StructType([
             StructField("keep", StringType())
])))])

test_df = df.withColumn("big", to_json(col("big"))).withColumn("big", from_json(col("big"), new_schema))

>>> test_df.printSchema()
root
| -- big: struct(nullable=true)
| | -- big: array(nullable=true)
| | | -- element: struct(containsNull=true)
| | | | -- keep: string(nullable=true)

>>> test_df.show()
+----+
| big|
+----+
|null|
+----+

Так что либо я не правильно следую его указаниям, либо это не работает. Как это сделать без udf?

Документация Pyspark to_json Документация Pyspark from_json

1 Ответ

1 голос
/ 05 октября 2019

Это должно работать, вам просто нужно настроить вашу new_schema так, чтобы она включала метаданные только для столбца 'big', а не для фрейма данных:

new_schema = ArrayType(StructType([StructField("keep", StringType())]))

test_df = df.withColumn("big", from_json(to_json("big"), new_schema))
...