Проверка схемы столбца с помощью StructType в Pyspark 2.4 - PullRequest
0 голосов
/ 19 сентября 2019

У меня есть фрейм данных, в котором есть столбец, представляющий собой строку JSON

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

sc = SparkSession.builder.getOrCreate()

l = [
    (1, """{"key1": true, "nested_key": {"mylist": ["foo", "bar"], "mybool": true}})"""),
    (2, """{"key1": true, "nested_key": {"mylist": "", "mybool": true}})"""),
    ]
df = sc.createDataFrame(l, ["id", "json_str"])

, и я хочу проанализировать столбец json_str с from_json, используя схему


schema = StructType([
    StructField("key1", BooleanType(), False),
    StructField("nested_key", StructType([
        StructField("mylist", ArrayType(StringType()), False),
        StructField("mybool", BooleanType(), False)
     ]))
])

df = df.withColumn("data", F.from_json(F.col("json_str"), schema))
df.show(truncate=False)
+---+--------------------------+
|id |data                      |
+---+--------------------------+
|1  |[true, [[foo, bar], true]]|
|2  |[true, [, true]]          |
+---+--------------------------+

Как можно видеть, вторая строка не соответствует схеме в schema, поэтому она пуста, хотя я передал False в nullable в StructField.Для моего конвейера важно, чтобы, если есть данные, которые не соответствуют схеме, определили, что оповещение каким-то образом поднялось, но я не уверен в том, как лучше всего это сделать в Pyspark.Реальные данные имеют много-много ключей, некоторые из них вложены, поэтому проверка каждого с какой-либо формой isNan неосуществима, и, поскольку мы уже определили схему, кажется, ее следует использовать, чтобы использовать ее.

Если это важно, мне не обязательно проверять схему всего фрейма данных, я действительно после проверки схемы столбца StructType

1 Ответ

0 голосов
/ 19 сентября 2019

Проверьте параметр options: https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html?highlight=from_json#pyspark.sql.functions.from_json

Это немного расплывчато, но оно позволяет вам передать dict базовому методу здесь: https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html?highlight=from_json#pyspark.sql.DataFrameReader.json

Вы можете успешно передать что-то вроде options={'mode' : 'FAILFAST'}.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...