tl; dr
Короткий ответ - нет, нет способа динамически вывести схему для каждой строки и получить столбец, в котором разные строки имеют разные схемы.
Однако есть способ вывести желаемую строку json и согласовать разные json в общую схему с богатым типом
Подробно
Если бы это было разрешено, это было бы мучительно медленно, но, что более важно, это не разрешено, потому что это нарушает реляционную модель, которая позволяет SparkSQL работать согласованно.
Кадр данных состоит из столбцов (полей), а столбец имеет толькоодин тип данных;тип данных представляет весь столбец.Это не является строго обязательным для исполнения в Pyspark, учитывая природу Python, но важно во время выполнения, чтобы этот оператор все еще применялся.
В вашем примере, если вы хотите project атрибут City
с чем-токак d.Body.City
, тогда это должно существовать как для Альфреда, так и для Эмбер.По крайней мере, метаданные для этого поля должны существовать, даже если нет значения.Механизм выполнения должен быстро узнать, является ли путь неверным или нет, чтобы избежать бессмысленного сканирования каждой строки.
Существует несколько способов согласования нескольких типов в одном столбце (я уверен, что я могу сделать больше)t думать о):
- использовать вариант / union / option type (например, объединить все общие и необычные схемы json вместе)
- сериализовать его в нечто, например, в строку json (это то место, где вы начинаете перед применением этой jsonschema, отлично подходит для передачи данных, не очень хорошо для анализа)
- превратить его в супертип, коробочный или универсальный объект (как в RDD), имеющий наименьший общий знаменательповедение / интерфейс / свойство (теряет метаданные и атрибуты подтипов)
- Не храните его как один тип, например, храните отдельные варианты в разных столбцах и используйте разные схемы json для каждого
Мне нравится (1) в этом случае, но (4) может быть допустимым в качестве промежуточного шага к поиску универсальной схемы.
Ваш пример "обычной" схемы json больше похож на option (3).Внутри карты, которую вы назвали «d» (я полагаю, потому что это диктат?) Информация о полях недоступна без сканирования данных.
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Я понимаю, что это всего лишь промежуточный шаг кдобавление нового столбца, содержащего Body
, но для этого необходимо перечислить все возможные ключи на этой карте в более полезную схему.
Решение
Универсальный (общий)схема не является общей картой string -> string
, я думаю, что она более полезна, как показано ниже.Он близок к тому, что вы пробовали изначально, но не является динамическим и действителен для обеих строк.Обратите внимание, что nullable
является значением по умолчанию True
для всех атрибутов
schema_body = StructType([
StructField("City", StringType()),
StructField("Country", StringType()),
StructField("Weight", IntegerType()),
StructField("Height", IntegerType())
])
df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
df.show(2, False)
+------+---------------------------------------------------------------+---------------------+
|Name |d |Body |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2) |[null,null,80,176] |
+------+---------------------------------------------------------------+---------------------+
Теперь вы можете легко добраться до Body.City
, выбрав d.Body.City
, не беспокоясь о том, какие строки имеют город.
Для вашего следующего шага вы можете вернуть его обратно в строку json
df = df.withColumn("Body", to_json("d.Body"))
Вы также можете объединить его с предыдущим шагом
df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyAttributes: struct (nullable = true)
| |-- Body: string (nullable = true)
| |-- BodyType: integer (nullable = true)
|-- Body: string (nullable = true)
df.show(2, False)
+------+---------------------------------------+--------------------------------+
|Name |BodyAttributes |Body |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2] |{"Weight":80,"Height":176} |
+------+---------------------------------------+--------------------------------+
Примечаниепри преобразовании его обратно в строку json эти значения NULL исчезают.Также теперь jsonstring так легко записать в файл, как вы хотели.
Идем дальше
Если бы вы делали это как часть процесса, чтобы сделать данные доступными для анализа, составления отчетов или других целей, я бы сделал что-то вроде этого
schema = StructType([
StructField('Name',StringType(), False),
StructField(
'd',
StructType([
StructField("Body", StringType()),
StructField("BodyType", IntegerType())
])
)
])
df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
"Body",
from_json("d.Body", schema_body)
).withColumn(
"BodyType",
col("d.BodyType")
).drop("d")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
|-- BodyType: integer (nullable = true)
df.show(2, False)
+------+---------------------+--------+
|Name |Body |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1 |
|Alfred|[null,null,80,176] |2 |
+------+---------------------+--------+
Затем вы можете выбрать Body.City
, Body.Country
, Body.Weight,
Body.Height`
Вы могли бы сделать еще один шаг, но это действительно зависело бы от того, сколько из этих возможных ключей тела существуети насколько оно редкое.
df = df.withColumn(
"City", col("Body.City")
).withColumn(
"Country", col("Body.Country")
).withColumn(
"Weight", col("Body.Weight")
).withColumn(
"Height", col("Body.Height")
).drop("Body")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyType: integer (nullable = true)
|-- City: string (nullable = true)
|-- Country: string (nullable = true)
|-- Weight: integer (nullable = true)
|-- Height: integer (nullable = true)
df.show(2, False)
+------+--------+------+-------+------+------+
|Name |BodyType|City |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1 |Oregon|US |null |null |
|Alfred|2 |null |null |80 |176 |
+------+--------+------+-------+------+------+