Обработка пустых массивов в pySpark (необязательный двоичный элемент (UTF8) не является группой) - PullRequest
2 голосов
/ 19 февраля 2020

У меня есть json -подобная структура в spark, которая выглядит следующим образом:

>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

в зависимости от раздела, some-array может быть пустым массивом для всех id. Когда эта случайная искра выводит следующую схему:

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string

Конечно, это проблема, если я хочу прочитать несколько разделов, потому что spark не может объединить схемы. Я попытался определить схему вручную, поэтому проблем не должно быть

>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

Пока все хорошо, но когда я пытаюсь собрать данные, я получаю сообщение об ошибке:

>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group

Я не понимаю, почему это не удается. Схема не должна быть причиной несовместимости. Если вам интересно, сбор данных без указания схемы работает.

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works

Редактировать

Вот воспроизводимый пример в python

from pyspark.sql.types import *


myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])

path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema

read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!

Ответы [ 2 ]

1 голос
/ 24 февраля 2020

Решение, которое я нашел, заключается в установке опции dropFieldIfAllNull на True при чтении файла json. Это приводит к исчезновению поля с пустым массивом, что облегчает объединение схемы.

>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string

Теперь вывод нежелательного типа не будет применяться, и при чтении нескольких разделов одного файла опция mergeSchema сможет читать все файлы без коллизий.

1 голос
/ 23 февраля 2020

Конечно, это проблема, если я хочу прочитать несколько разделов, потому что spark не может объединить схемы. Я попытался определить схему вручную, поэтому проблем не должно быть

Боюсь, что не существует такой схемы, которая могла бы анализировать одновременно эти два случая. Данные {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]} могут быть проанализированы только с:

good_schema = StructType([
  StructField('id', StringType()), 
  StructField( 'some-array', 
              ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())
              ])
          ))
  ])

, когда {"id": "OK", "some-array": []} с:

bad_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', ArrayType(StringType()))
])

Поэтому один из вариантов - прочитать эти два каталога с разными схемами.

Я не понимаю, почему это не удается. Схема не должна быть причиной несовместимости.

Как объяснено выше, данные несовместимы со схемой.

В случае, если вам интересно, сбор данных без указания схемы работает.

Это ожидаемое поведение, поскольку, если не указана явная схема, Spark попытается обнаружить it.

Предлагаемое решение

Единственное решение, которое я могу придумать, - это обрабатывать поле some-array как строку. Я не знаю, возможно ли это в вашей системе, хотя вы могли бы реализовать это, cast, вставив some-array в строку для обеих схем / разделов.

Это преобразование может быть выполнено с использованием как минимум двух параметров :

good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
  1. Считать оба набора данных и преобразовать поле some-array в строку, а затем сохранить результаты в одном общем каталоге с помощью:
good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))

good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
Выполните указанное выше преобразование во время выполнения, пропустив шаг перезаписи.

Наконец, вы можете загрузить some-array в виде строки и затем преобразовать ее в array_schema, используя функцию from_json:

from pyspark.sql.types import *
from pyspark.sql.functions import from_json

array_schema = ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())]))

# we will use this for both partitions
generic_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', StringType())
])

parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"

# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)

final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))

final_df.show(10, False)

# +---+------------------------+
# |id |some-array              |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[]                      |
# +---+------------------------+

final_df.printSchema()
# root
#  |-- id: string (nullable = true)
#  |-- some-array: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- array-field-1: string (nullable = true)
#  |    |    |-- array-field-2: string (nullable = true)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...