Scala Преобразование DF в RDD - ошибка java .lang.NumberFormatException: для входной строки: "age" - PullRequest
0 голосов
/ 03 августа 2020

Я делаю маленькие шаги в scala и искру, и я столкнулся с ошибкой, которую я не могу решить.

Я пытаюсь сопоставить файл csv с DF, но возвращает ошибка.

// Adding schema to RDDs - Initialization
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Employee(name: String, age: Long)
val employeeDF = spark.sparkContext.textFile.("./employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0), attributes(1).trim.toInt)).toDF()
employeeDF.createOrReplaceTempView("employee")

var youngstersDF = spark.sql("SELECT name,age FROM employee WHERE age BETWEEN 18 AND 30")
youngstersDF.map(youngster => "Name: " + youngster(0)).show()

При попытке сопоставить имя возвращает ошибку, как описано ниже:

Возвращенная ошибка:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 21, 192.168.0.122, executor driver): java.lang.NumberFormatException: For input string: "age"

Содержимое файла: имя , возраст Джон, 28 лет, Эндрю, 36 лет, Кларк, 22 года, Кевин, 42 года

Я погуглил, но не повезло с решениями / ответами.

Кто-нибудь может помочь?

С огромной благодарностью, Ксави

Ответы [ 2 ]

0 голосов
/ 03 августа 2020

Я бы попробовал -

 def getType[T: scala.reflect.runtime.universe.TypeTag](obj: T) = scala.reflect.runtime.universe.typeOf[T]
    val path = getClass.getResource("/csv/employee.txt").getPath
    val ds = spark.read
      .schema(ScalaReflection.schemaFor[Employee].dataType.asInstanceOf[StructType])
      .option("header", true)
      .option("sep", ",")
      .csv(path)
      .as[Employee]
    println(getType(ds))
    /**
      * org.apache.spark.sql.Dataset[com.som.spark.learning.Employee]
      */
    ds.show(false)
    ds.printSchema()
    /**
      * +------+---+
      * |name  |age|
      * +------+---+
      * |John  |28 |
      * |Andrew|36 |
      * |Clarke|22 |
      * |Kevin |42 |
      * +------+---+
      *
      * root
      * |-- name: string (nullable = true)
      * |-- age: long (nullable = true)
      */
case class Employee(name: String, age: Long)
0 голосов
/ 03 августа 2020

Вам необходимо filter out the header из ваших данных при преобразовании в dataframe.

Example:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._

case class Employee(name: String, age: Long)
val employeeRDD = spark.sparkContext.textFile("./employee.txt")

//storing header string
val header=employeeRDD.first()

//filter out the header from the data
val employeeDF = employeeRDD.filter(r => r!=header).map(_.split(",")).map(attributes => Employee(attributes(0), attributes(1).trim.toInt)).toDF()

employeeDF.createOrReplaceTempView("employee")

sql("select * from employee").show()
//+------+---+
//|  name|age|
//+------+---+
//|  John| 28|
//|Andrew| 36|
//|Clarke| 22|
//| Kevin| 42|
//+------+---+

Просто к вашему сведению можно использовать spark.read.csv с заголовком параметра и передать схему при чтении.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val sch=new StructType().add("name",StringType).add("age",LongType)
val df=spark.read.option("header",true).option("delimiter",",").schema(sch).csv("./employee.txt")

df.show()
//+------+---+
//|  name|age|
//+------+---+
//|  John| 28|
//|Andrew| 36|
//|Clarke| 22|
//| Kevin| 42|
//+------+---+
...