Используя функцию has_column
, определите здесь с помощью zero323 и общие рекомендации по добавлению пустых столбцов либо
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
if has_column(df_record, "key3.ResponseType"):
df_basicInfo = df_record.withColumn("ResponseType", col("key3.ResponseType"))
else:
# Adjust types according to your needs
df_basicInfo = df_record.withColumn("ResponseType", lit(None).cast("string"))
и повторите длякаждый столбец, который вам нужен, или
df_record.withColumn(
"ResponseType",
when(
lit(has_column(df_record, "key3.ResponseType")),
col("key3.ResponseType")
).otherwise(lit(None).cast("string"))
Настройте типы в соответствии с вашими требованиями и повторите процедуру для остальных столбцов.
В качестве альтернативы определите схему, охватывающую все требуемые типы:
schema = StructType([
StructField("key1", StringType()),
StructField("key2", StringType()),
StructField("key2", StructType([
StructField("ResponseType", StringType()),
StructField("someIndicator", StringType()),
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
(еще раз настройте типы) и используйте свой текущий код.