Pyspark читает CSV со схемой, проверкой заголовка и хранит поврежденные записи - PullRequest
2 голосов
/ 07 марта 2019

Я пытаюсь использовать pyspark csv reader по следующим критериям:

  • Чтение CSV в соответствии с типами данных в схеме
  • Убедитесь, что имена столбцов в заголовке и схеме совпадают
  • Хранить битые записи в новом поле

Вот что я пробовал.

file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
                    columnNameOfCorruptRecord='broken')
print(df.show())

Выход:

+----+----+
|   a|   b|
+----+----+
|   1|   2|
|null|null|
+----+----+

Эта команда не сохраняет поврежденные записи. Если я добавлю broken к схема и удалить заголовок проверки команды работает с предупреждением.

DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True, 
                    columnNameOfCorruptRecord='broken')
print(df.show())

Выход:

WARN  CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
|   a|   b|broken|
+----+----+------+
|   1|   2|  null|
|null|null|3,four|
+----+----+------+

Это предполагаемое поведение или есть ошибка, которая нарушает первый пример? Есть ли лучший способ сделать это?

Еще одна вещь. Я хочу обработать правильно сформированные поля в поврежденных записях чтобы получить кадр данных, как это.

+--+----+------+
| a|   b|broken|
+--+----+------+
| 1|   2|  null|
| 3|null|3,four|
+--+----+------+

Должен ли я сделать дополнительный шаг после прочтения, чтобы получить это, или есть какие-то вариант, который я пропустил, чтобы быть более разрешительным.

1 Ответ

0 голосов
/ 08 марта 2019

Это правильное поведение по умолчанию.Если вы выводите схему, она неявно добавляет поле columnNameOfCorruptRecord в выходную схему, в противном случае вам нужно предоставить поле строкового типа columnNameOfCorruptRecord в определенной пользователем схеме или изменить имя столбца, например, как broken, и добавить то же имя в схему.

Невозможно обработать данные частично, как вы упомянули, для этого вам нужно написать свой собственный анализатор, расширяющий CSVFileFormat в spark.Для получения списка всех csvoptions, проверьте org / apache / spark / sql / execute / datasources / csv / CSVOptions.scala

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...