Я пытаюсь использовать pyspark csv reader по следующим критериям:
- Чтение CSV в соответствии с типами данных в схеме
- Убедитесь, что имена столбцов в заголовке и схеме совпадают
- Хранить битые записи в новом поле
Вот что я пробовал.
file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
columnNameOfCorruptRecord='broken')
print(df.show())
Выход:
+----+----+
| a| b|
+----+----+
| 1| 2|
|null|null|
+----+----+
Эта команда не сохраняет поврежденные записи. Если я добавлю broken
к
схема и удалить заголовок проверки команды
работает с предупреждением.
DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True,
columnNameOfCorruptRecord='broken')
print(df.show())
Выход:
WARN CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
| a| b|broken|
+----+----+------+
| 1| 2| null|
|null|null|3,four|
+----+----+------+
Это предполагаемое поведение или есть ошибка, которая нарушает первый пример?
Есть ли лучший способ сделать это?
Еще одна вещь. Я хочу обработать правильно сформированные поля в поврежденных записях
чтобы получить кадр данных, как это.
+--+----+------+
| a| b|broken|
+--+----+------+
| 1| 2| null|
| 3|null|3,four|
+--+----+------+
Должен ли я сделать дополнительный шаг после прочтения, чтобы получить это, или есть какие-то
вариант, который я пропустил, чтобы быть более разрешительным.