Я начинаю изучать искры со scala .pardon для моего разбитого английского ... Мне нужно написать программу для синтаксического анализа файла с разделителями и фиксированной шириной в Dataframe с использованием spark-scala Dataframe Api. Также, если входные данные повреждены, программа должна обработать ниже заданный способ:
A:ignoring the input data
B:investigate the error in input
C:stop on error
Для достижения вышеуказанной цели я успешно выполнил анализ с обработкой исключений для файла с разделителями с использованием параметров DataFrame Api. Но я не знаю, как применить ту же технику для файла с фиксированной шириной. Я использую версию Spark 2.4.3.
// predefined schema used in program
val schema = new StructType()
.add("empno",IntegerType,true)
.add("ename",StringType,true)
.add("designation",StringType,true)
.add("manager",StringType,true)
.add("hire_date",StringType,true)
.add("salary",DoubleType,true)
.add("deptno",IntegerType,true)
.add("_corrupt_record", StringType, true)
// parse csv file into DataFrame Api
// option("mode","PERMISSIVE") used to handle corrupt record
val textDF =sqlContext.read.format("csv").option("header", "true").schema(schema).option("mode", "PERMISSIVE").load("empdata.csv")
textDF.show
// program for fixed width line
// created lsplit method to split line into list of tokens based on width input / string
def lsplit(pos: List[Int], str: String): List[String] = {
val (rest, result) = pos.foldLeft((str, List[String]())) {
case ((s, res),curr) =>
if(s.length()<=curr)
{
val split=s.substring(0).trim()
val rest=""
(rest, split :: res)
}
else if(s.length()>curr)
{
val split=s.substring(0, curr).trim()
val rest=s.substring(curr)
(rest, split :: res)
}
else
{
val split=""
val rest=""
(rest, split :: res)
}
}
// list is reversed
result.reverse
}
// create case class to hold parsed data
case class EMP(empno:Int,ename:String,designation:String,manager:String,hire_dt:String,salary:Double,deptno:Int)
// create variable to hold width length
val sizeOfColumn=List(4,4,5,4,10,8,2);
// code to transform string to case class record
val ttRdd=textDF.map {
x =>
val row=lsplit(sizeOfColumn,x.mkString)
EMP(row(0).toInt,row(1),row(2),row(3),row(4).toDouble,row(5).toInt)
}
Code works fine for proper data but fails if incorrect data comes in file.
for e.g: "empno" column has some non-integer data..program throws exception NumberFormatException..
The program must handle if actual data in file does not match the specified schema as handled in delimited file.
Пожалуйста, помогите мне здесь. Мне нужно использовать тот же метод для файла фиксированной ширины, что и для файла с разделителями.