Странное поведение в CSV-парсере Spark 2, когда включена опция multiLine - PullRequest
0 голосов
/ 15 мая 2018

При создании DataFrame из файла CSV, если включена опция multiLine, некоторые столбцы файла анализируются неправильно.

Здесь идет выполнение кода. Я постараюсь показать странное поведение в коде.

Сначала я загружаю файл с двумя переменными: df_ok загружает файл без опции multiLine, а df_ko загружает файл с включенной опцией multiLine. Файл, который я использую в качестве примера, имеет \r\n в качестве EOL, он закодирован в UTF-8 и имеет канал в качестве разделителя столбцов.

val df_ok = spark.read.format("csv").option("header", "true").option("delimiter", "|").load("/.../20180423_LSV.csv")
val df_ko = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("multiLine", "true").load("/.../20180423_LSV.csv")

df_ok.printSchema
root
 |-- FILIALE: string (nullable = true)
 |-- IDMDB: string (nullable = true)
 |-- FIMAGIC: string (nullable = true)
 ...
 |-- STATUS: string (nullable = true)
 |-- LSV_TYPE: string (nullable = true)

df_ko.printSchema
root
 |-- FILIALE: string (nullable = true)
 |-- IDMDB: string (nullable = true)
 |-- FIMAGIC: string (nullable = true)
 ...
 |-- STATUS: string (nullable = true)
 : string (nullable = true)

 df_ko.columns
 res0: Array[String] = Array(FILIALE, IDMDB, FIMAGIC, FIVEHMAGIC, SITE, VEHICLE, DESC001, DESC002, LOCN, PROGRESS, CHASSIS, REGN, CUSDATE, INVDATE, STYPE, TARMAGIC, VEHMAGIC, COST, REGDATE, DELDATE, EXEC001, INVOICE, INVTOT, STATUS, "LSV")YPE

Первое, что я заметил, это то, что при использовании multiLine столбец LSV_TYPE исчезает. Вместо этого он показывает : string (nullable = true). При использовании df_ko.columns он показывает что-то действительно странное: ..., "LSV")YPE. Этого не происходит, когда я использую только \n в качестве EOL. Установка опции quote ничего не меняет.

Следующим, что я протестировал, был выбор столбца FILIALE:

df_ok.select($"FILIALE")
res1: org.apache.spark.sql.DataFrame = [FILIALE: string]

df_ok.columns.contains("FILIALE")
res2: Boolean = true

df_ko.select($"FILIALE")
org.apache.spark.sql.AnalysisException: cannot resolve '`FILIALE`' given input columns: [INVOICE, DESC001, COST, STYPE, PROGRESS, INVTOT, VEHICLE, REGDATE, TARMAGIC, STATUS, CUSDATE, LOCN, INVDATE, SITE, DELDATE, REGN, EXEC001, VEHMAGIC,, DESC002, FIVEHMAGIC, CHASSIS, FIMAGIC, FILIALE];;

Поэтому я подумал, что есть нечто большее, чем в названии столбца:

df_ko.columns.head.toArray
res68: Array[Char] = Array(, F, I, L, I, A, L, E)

df_ko.columns.head.toArray.foreach(c => println(c.toInt))
65279
70
73
76
73
65
76
69

Чем я проверял файл по-другому:

spark.read.text("/.../20180423_LSV.csv").collect.head.getString(0).toArray.foreach(c => println(c.toInt))
70
73
76
73
65
76

Тогда я был уверен, что проблема связана с анализом CSV. Добавление опции charset или использование однозначности в качестве значения опции parseLib ничего не меняет.

Файл, который я использую:

FILIALE|IDMDB|FIMAGIC|FIVEHMAGIC|SITE|VEHICLE|DESC001|DESC002|LOCN|PROGRESS|CHASSIS|REGN|CUSDATE|INVDATE|STYPE|TARMAGIC|VEHMAGIC|COST|REGDATE|DELDATE|EXEC001|INVOICE|INVTOT|STATUS|LSV_TYPE
XXXXXXXX|XX696209|XX696209|XX0|73|100284|XXXXXXXXXXXXX45XXXXXXXX|X|73X|X|XXX4503321X2361427|73XX100284||24X10X2005|X|696209|0|9592X7||22X10X2005|XXXX73|500228|10841X24|X|XX
XXXXXXXX|XX1454353|XX1454353|XX959136|73|100212|XXXXXXXXXXXXX45XXXXXXXXXXXXXXXXXX|XXXXXXXXXXXXX45XXXXXXXXXXXXXXXXXX|73X|X|XXX4503321X2096859|73XX100212||08X09X2005|X|1454353|959136|0|||XXXX73|500205|0|X|XX
XXXXXXXX|XX607020|XX607020|XX0|73|100097|XXXXXXXXXXXX50XXXXXXXXXXXXXX|X|73X|X|XXX4540001X0540628|8232XX33||17X02X2005|X|607020|0|10868X34|||XXXX73|500025|11750|X|XX
XXXXXXXX|XX1796002|XX1796002|XX0|73|100091|XXXXXXXXXXXX70XXXXXXXXXXX|X|73X|X|XXX4540011X072541X|73XX100091||21X01X2005|X|1796002|0|12457X44||19X01X2005|XXXX73|500010|13616X9|X|XX
XXXXXXXX|XX728637|XX728637|XX0|73|100046|XXXXXXXXXXXXX55XXXXXXXXXX|X|73X|X|XXX4503331X1326935|4059XX33||25X11X2005|X|728637|0|14425X76|22X02X2005|24X11X2005|XXXX73|500244|17500|X|XX
XXXXXXXX|XX555718|XX555718|XX0|73|100020|XXXXXXXXXXXXXXX45XXX|X|73X|X|XXX4524321X0392633|73XX100020||01X08X2005|X|555718|0|12897||29X07X2005|XXXX73|500173|17446X39|X|XX
XXXXXXXX|XX589182|XX589182|XX0|73|100270|XXXXXXXX1X1XXXXXXXX|X|73X|X|XXX4540301X0461656|73XX100270||19X09X2005|X|589182|0|13112X6||16X09X2005|XXXX73|500201|14998|X|XX
XXXXXXXX|XX1796399|XX1796399|XX0|73|100362|XXXXXXXXXXXXX45XXXXXXXXXXXXXX|XXXXXXXXXXXXX45XXXXXXXXXXXXXX|73X|X|XXX4503321X2775489|73XX100362||22X05X2006|X|1796399|0|10783X11||17X05X2006|XXXX73|500087|11976X24|X|XX
XXXXXXXX|XX1796399|XX1796399|XX0|73|100337|XXXXXXXXXXXXX45XXXXXXXXXXXXXX|XXXXXXXXXXXXX45XXXXXXXXXXXXXX|73X|X|XXX4503321X2654339|73XX100337||22X05X2006|X|1796399|0|10672X11||17X05X2006|XXXX73|500086|11976X24|X|XX
XXXXXXXX|XX340211|XX340211|XX0|73|100383|XXXXXXXX50XXXXXXXXXXX|X|73X|X|XXX4540001X0839774|2724XX33||05X06X2006|X|340211|0|13321|23X05X2006|02X06X2006|XXXX73|500099|10999X99|X|XX

Я выполнил свой код на HDP 2.6.4 с Spark 2.2.0.

У кого-нибудь есть обходной путь или представление о том, что происходит?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...