Spark-Scala: Parse Фиксированная линия ширины в Dataframe Api с обработкой исключений - PullRequest
0 голосов
/ 05 июля 2019

Я начинаю изучать искры со scala .pardon для моего разбитого английского ... Мне нужно написать программу для синтаксического анализа файла с разделителями и фиксированной шириной в Dataframe с использованием spark-scala Dataframe Api. Также, если входные данные повреждены, программа должна обработать ниже заданный способ:

A:ignoring the input data
B:investigate the error in input
C:stop on error

Для достижения вышеуказанной цели я успешно выполнил анализ с обработкой исключений для файла с разделителями с использованием параметров DataFrame Api. Но я не знаю, как применить ту же технику для файла с фиксированной шириной. Я использую версию Spark 2.4.3.

// predefined schema used in program
val schema = new StructType()
.add("empno",IntegerType,true)
.add("ename",StringType,true)
.add("designation",StringType,true)
.add("manager",StringType,true)
.add("hire_date",StringType,true)
.add("salary",DoubleType,true)
.add("deptno",IntegerType,true)
.add("_corrupt_record", StringType, true)

// parse csv file into DataFrame Api
// option("mode","PERMISSIVE") used to handle corrupt record
val textDF =sqlContext.read.format("csv").option("header", "true").schema(schema).option("mode", "PERMISSIVE").load("empdata.csv")
textDF.show

// program for fixed width line

// created lsplit method to split line into list of tokens based on width input / string

def lsplit(pos: List[Int], str: String): List[String] = {
val (rest, result) = pos.foldLeft((str, List[String]())) {
case ((s, res),curr) =>
    if(s.length()<=curr)
    {
    val split=s.substring(0).trim()
    val rest=""
    (rest, split :: res)
    }
    else if(s.length()>curr)
    {
    val split=s.substring(0, curr).trim()
    val rest=s.substring(curr)
    (rest, split :: res)
    }
    else
    {
    val split=""
    val rest=""
    (rest, split :: res)
    }
}
// list is reversed
result.reverse
}
// create case class to hold parsed data
case class EMP(empno:Int,ename:String,designation:String,manager:String,hire_dt:String,salary:Double,deptno:Int)


// create variable to hold width length
val sizeOfColumn=List(4,4,5,4,10,8,2);

// code to transform string to case class record
val ttRdd=textDF.map { 
    x => 
    val row=lsplit(sizeOfColumn,x.mkString) 
    EMP(row(0).toInt,row(1),row(2),row(3),row(4).toDouble,row(5).toInt)
}


Code works fine for proper data but fails if incorrect data comes in file.
for e.g: "empno" column has some non-integer data..program throws exception NumberFormatException..
The program must handle if actual data in file does not match the specified schema as handled in delimited file.

Пожалуйста, помогите мне здесь. Мне нужно использовать тот же метод для файла фиксированной ширины, что и для файла с разделителями.

1 Ответ

0 голосов
/ 08 июля 2019

Это вроде как очевидно.

Вы смешиваете свой подход с опцией API "permissive".

Разрешающий будет обнаруживать ошибки, такие как неправильный тип данных. Тогда ваш собственный процесс lsplit все еще выполняется и может получить нулевое исключение. Например. Если я добавлю в empnum "YYY", это будет заметно.

Если тип данных в порядке и длина неправильная, вы в большинстве случаев обрабатываете правильно, но поля искажены.

Ваш lsplit должен быть более надежным, и вам нужно проверить, существует ли там ошибка или перед тем, как вызывать, не вызывать.

Первый случай

+-----+-----+---------------+
|empno|ename|_corrupt_record|
+-----+-----+---------------+
| null| null|      YYY,Gerry|
| 5555|Wayne|           null|
+-----+-----+---------------+

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 30, localhost, executor driver): java.lang.NumberFormatException: For input string: "null"

Второй случай

+------+-----+---------------+
| empno|ename|_corrupt_record|
+------+-----+---------------+
|444444|Gerry|           null|
|  5555|Wayne|           null|
+------+-----+---------------+

res37: Array[EMP] = Array(EMP(4444,44Ger), EMP(5555,Wayne))

Короче говоря, некоторая работа, и на самом деле не нужен заголовок.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...