У меня есть фрейм данных, в котором есть столбец, представляющий собой строку JSON
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
sc = SparkSession.builder.getOrCreate()
l = [
(1, """{"key1": true, "nested_key": {"mylist": ["foo", "bar"], "mybool": true}})"""),
(2, """{"key1": true, "nested_key": {"mylist": "", "mybool": true}})"""),
]
df = sc.createDataFrame(l, ["id", "json_str"])
, и я хочу проанализировать столбец json_str
с from_json
, используя схему
schema = StructType([
StructField("key1", BooleanType(), False),
StructField("nested_key", StructType([
StructField("mylist", ArrayType(StringType()), False),
StructField("mybool", BooleanType(), False)
]))
])
df = df.withColumn("data", F.from_json(F.col("json_str"), schema))
df.show(truncate=False)
+---+--------------------------+
|id |data |
+---+--------------------------+
|1 |[true, [[foo, bar], true]]|
|2 |[true, [, true]] |
+---+--------------------------+
Как можно видеть, вторая строка не соответствует схеме в schema
, поэтому она пуста, хотя я передал False
в nullable
в StructField.Для моего конвейера важно, чтобы, если есть данные, которые не соответствуют схеме, определили, что оповещение каким-то образом поднялось, но я не уверен в том, как лучше всего это сделать в Pyspark.Реальные данные имеют много-много ключей, некоторые из них вложены, поэтому проверка каждого с какой-либо формой isNan
неосуществима, и, поскольку мы уже определили схему, кажется, ее следует использовать, чтобы использовать ее.
Если это важно, мне не обязательно проверять схему всего фрейма данных, я действительно после проверки схемы столбца StructType