Как правильно проверить тип данных CSV-данных в спарк? - PullRequest
1 голос
/ 05 июня 2019

В качестве входных данных для программы spark у нас есть файл JSON (который описывает определение схемы и ограничения, которые мы хотим проверить для каждого столбца), и я хочу выполнить некоторые проверки качества данных, такие как (не NULL, UNIQUE) и проверки типов данных также (хочет проверить, содержит ли файл csv данные в соответствии со схемой json или нет?).

Файл JSON:

{

"Идентификатор": "1",

"имя": "Сотрудник",

"источник": "местный",

"file_type": "Текст",

"sub_file_type": "CSV",

"разделитель": ""

"путь": "/ пользователь / все / dqdata / данные / emp.txt",

"столбцы": [

{"column_name": "empid", "datatype": "integer", "constraints": ["not null", "unique"], "values_permitted": ["1", "2"]},

{"column_name": "empname", "datatype": "string", "constraints": ["not null", "unique"], "values_permitted": ["1", "2"]},

{"column_name": "salary", "datatype": "double", "constraints": ["not null", "unique"], "values_permitted": ["1", "2"]},

{"column_name": "doj", "datatype": "date", "constraints": ["not null", "unique"], "values_permitted": ["1", "2"]},

{"column_name": "location", "string": "number", "constraints": ["not null", "unique"], "values_permitted": ["1", "2"]}

]

}

Пример ввода CSV:

EmpId, EmpName, Салар, д.р., место

1, а, 10000,11-03-2019, Пуна

* * 1 042 2, б, 10020,14-03-2019, Пуна * * один тысяча сорок три

3, а, 10010,15-03-2019, Пуна

а, 1,10010,15-03-2019, Пуна

Имейте в виду, что

1) намеренно я поставил неверные данные для empId и поля имени (проверьте последнюю запись). 2) Номер столбца в файле json не фиксирован?

Вопрос:

Как я могу убедиться, что входной файл данных содержит все записи в соответствии с данным типом данных (в JSON) или нет?

Мы попробовали следующие вещи:

1) Если мы попытаемся загрузить данные из CSV-файла, используя фрейм данных, применяя внешнюю схему, то программа spark немедленно выдаст какое-то исключение приведения (NumberFormatException и т. Д.), И это приведет к аварийному завершению программы. Но я хочу продолжить поток выполнения и записать конкретную ошибку как «Ошибка несоответствия типов данных для столбца empID». Вышеприведенный сценарий работает только тогда, когда мы вызываем какое-то действие RDD для фрейма данных, что, как мне показалось, странный способ проверки схемы.

Пожалуйста, ведите меня, как мы можем достичь этого в искре?

1 Ответ

0 голосов
/ 05 июня 2019

Я не думаю, что здесь есть бесплатный обед, вы должны написать этот процесс самостоятельно, но процесс, который вы можете сделать, это ...

  1. Считайте файл csv как Dataset изStrings чтобы все строки были хорошими
  2. Анализировать набор данных с помощью функции map, чтобы проверить наличие Null или проблем с типом данных для столбца
  3. Добавить дополнительные два столбца, boolean называется validRow и String называется message или description
  4. С парсером, упомянутым в '2.', сделайте какой-нибудь try/catch или Try/Success/Failure для каждогозначение в каждом столбце, перехватите исключение и установите соответственно столбцы validRow и description
  5. Выполните фильтрацию и запишите один DataFrame/DataSet, который успешен (флаг validRow установлен в True) в место успеха, и напишите ошибку DataFrame/DataSet в место ошибки
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...