Я вижу, что вы извлекли документы JSON из Azure CosmosDB и преобразовали их в PySpark DataFrame, но вложенный документ или массив JSON не удалось преобразовать как объект JSON в столбце DataFrame, как вы ожидали, поскольку не определен тип JSONв модуле pyspark.sql.types
, как показано ниже.
![enter image description here](https://i.stack.imgur.com/cJgoN.png)
Я искал документ PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame
, который подойдет для вашего решения.текущий случай, даже такой, как вы хотите, пока я пытался его решить.
В приведенном выше документе показано, как использовать ArrayType
, StructType
, StructField
и другие базовые типы данных PySpark для преобразованияСтрока JSON в столбце с комбинированным типом данных, который может быть легче обработан в PySpark с помощью определения схемы столбца и UDF.
Вот сводка примера кода.Надеюсь, это поможет.
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
JSON считывается во фрейм данных через sqlContext.Вывод:
+------+--------------------+
|attr_1| attr_2|
+------+--------------------+
| 1|[{"a":1,"b":1},{"...|
| 2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
Затем, чтобы преобразовать столбец attr_2
через схему определения столбца и UDF.
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
Вывод выглядит следующим образом:
+------+--------------+
|attr_1| attr_2|
+------+--------------+
| 1|[[1,1], [2,2]]|
| 2|[[3,3], [4,4]]|
+------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)