Проверить файл CSV PySpark - PullRequest
       4

Проверить файл CSV PySpark

0 голосов
/ 21 ноября 2018

Я пытаюсь проверить файл CSV (количество столбцов на каждую запись).Согласно приведенной ниже ссылке, в Databricks 3.0 есть опция для обработки.

http://www.discussbigdata.com/2018/07/capture-bad-records-while-loading-csv.html

df = spark.read
  .option("badRecordsPath", "/data/badRecPath")
  .parquet("/input/parquetFile")

Однако я использую версию 2.3 spark и не могу использовать эту опцию.

Есть ли способ обнаружить плохие записи в файле csv при чтении как части pyspark и хотеть записать плохие записи в файл.

Схема не является статичной, поскольку мы обрабатываемданные нескольких таблиц и не могут жестко закодировать схему.

        df = spark.read.option("wholeFile", "true"). \
                        option("header", "true"). \
                        option("quote", "\""). \
                        csv("${table}/path/to/csv/file")

1 Ответ

0 голосов
/ 12 декабря 2018

Я не уверен, какие записи вы называете плохими, поскольку мы не можем видеть ваши входные данные.Исходя из моего предположения, допустим, что у нас есть входной файл ниже, имеющий пять столбцов.

col1,col2,col3,col4,col5
1,ABC,YYY,101,USA
2,ABC,ZZZ,102,USA
3,ABC,,,USA
4,ABC,GGG,104,USA
5,ABC,PPP,105

и строка №.3 имеет несколько пустых столбцов, а в строке № 5 меньше столбцов.поэтому я не хочу загружать эти две записи в мой фрейм данных.

PATH_TO_FILE = "file:///user/vikrant/hivespark/userinput"

df = sc.textFile(PATH_TO_FILE)\
           .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
           .map(lambda x: [i for i in x if len(i)!= 0]) \
           .filter(lambda line: len(line) > 4 and line[0] != 'col1') \
           .toDF(['Col1','Col2','Col3','Col4','Col5'])


>>> df.show();
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
|   1| ABC| YYY| 101| USA|
|   2| ABC| ZZZ| 102| USA|
|   4| ABC| GGG| 104| USA|
+----+----+----+----+----+

и, если вы хотите извлечь плохие записи из вашего входного файла:

badrecords = sc.textFile(PATH_TO_FILE)\
           .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
           .map(lambda x: [i for i in x if len(i)!= 0]) \
           .filter(lambda line: len(line) < 5 and line[0] != 'col1')

>>> badrecords.take(10)
[['3', 'ABC', 'USA'], ['5', 'ABC', 'PPP', '105']]

Дайте мне знать, еслиэто работает для вас или помогает!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...