Конечно, это проблема, если я хочу прочитать несколько разделов, потому что spark не может объединить схемы. Я попытался определить схему вручную, поэтому проблем не должно быть
Боюсь, что не существует такой схемы, которая могла бы анализировать одновременно эти два случая. Данные {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]}
могут быть проанализированы только с:
good_schema = StructType([
StructField('id', StringType()),
StructField( 'some-array',
ArrayType(StructType([
StructField('array-field-1', StringType()),
StructField('array-field-2', StringType())
])
))
])
, когда {"id": "OK", "some-array": []}
с:
bad_schema = StructType([
StructField('id', StringType()),
StructField('some-array', ArrayType(StringType()))
])
Поэтому один из вариантов - прочитать эти два каталога с разными схемами.
Я не понимаю, почему это не удается. Схема не должна быть причиной несовместимости.
Как объяснено выше, данные несовместимы со схемой.
В случае, если вам интересно, сбор данных без указания схемы работает.
Это ожидаемое поведение, поскольку, если не указана явная схема, Spark попытается обнаружить it.
Предлагаемое решение
Единственное решение, которое я могу придумать, - это обрабатывать поле some-array
как строку. Я не знаю, возможно ли это в вашей системе, хотя вы могли бы реализовать это, cast
, вставив some-array
в строку для обеих схем / разделов.
Это преобразование может быть выполнено с использованием как минимум двух параметров :
good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
- Считать оба набора данных и преобразовать поле
some-array
в строку, а затем сохранить результаты в одном общем каталоге с помощью:
good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))
good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
Выполните указанное выше преобразование во время выполнения, пропустив шаг перезаписи.
Наконец, вы можете загрузить some-array
в виде строки и затем преобразовать ее в array_schema
, используя функцию from_json
:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
array_schema = ArrayType(StructType([
StructField('array-field-1', StringType()),
StructField('array-field-2', StringType())]))
# we will use this for both partitions
generic_schema = StructType([
StructField('id', StringType()),
StructField('some-array', StringType())
])
parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"
# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)
final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))
final_df.show(10, False)
# +---+------------------------+
# |id |some-array |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[] |
# +---+------------------------+
final_df.printSchema()
# root
# |-- id: string (nullable = true)
# |-- some-array: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- array-field-1: string (nullable = true)
# | | |-- array-field-2: string (nullable = true)