spark.createDataFrame создает DF, но df.collect.foreach (println) завершается ошибкой с RuntimeException - PullRequest
0 голосов
/ 28 мая 2019

Версия Spark: Scala 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_211)

В следующем коде spark.createDataFrame создает DF, но кадр данных бесполезен (я не могу его распечатать или что-то с ним сделать).

val fieldSeparator = '\u001D' // 1D = group separator; 21 = !; 7C = |;
val recordSeparator = '\u001E' // 1E = record separator

val myRDD = dataSet.toDF().rdd.take(10)
    .map(r => r.mkString(fieldSeparator.toString))
val schema = new StructType()
    .add("displayname", StringType, true)
    .add("reputation", IntegerType, true)
    .add("numberOfPosts", LongType, true)
    .add("score", DoubleType, true)

println("---------- Creating DF from RDD: --------------")
val df = spark.createDataFrame(
    spark.sparkContext
         .parallelize(myRDD.map(r => r.split(fieldSeparator.toString)))
         .map(a => Row.fromSeq(a)),
    schema
)
println("---------- Successfully created DF from RDD (?) --------------")

println(df.getClass) // this prints: class org.apache.spark.sql.Dataset
df.printSchema()
/*
   root
   |-- displayname: string (nullable = true)
   |-- reputation: integer (nullable = true)
   |-- numberOfPosts: long (nullable = true)
   |-- score: double (nullable = true)
*/
myRDD.foreach(println) // prints all records correctly
df.collect.foreach(println)
// this fails with error: RuntimeException: java.lang.String is not a valid
// external type for schema of int ???

Результат должен совпадать с myRDD.foreach(println), который отлично работает.

1 Ответ

1 голос
/ 28 мая 2019

Давайте сначала создадим минимальный, воспроизводимый экземпляр вашей проблемы.Кстати, это то, что вы должны стараться делать каждый раз, когда задаете вопрос; -)

// A RDD of string
val rdd = sc.parallelize(Seq("oli,15,56,0.5", "you,45,49987787,0.4"))

// your schema
val schema = new StructType() 
    .add("displayname", StringType, true)
    .add("reputation", IntegerType, true)
    .add("numberOfPosts", LongType, true)
    .add("score", DoubleType, true)

// Now, let's try to create a dataframe
val rddOfRows = rdd.map(_.split(",")).map(Row.fromSeq(_))
val df = spark.createDataFrame(rddOfRows, schema)
// we can print its schema
df.printSchema
root
 |-- displayname: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- numberOfPosts: long (nullable = true)
 |-- score: double (nullable = true)

// but show triggers the exception you mentioned
df.show
  java.lang.RuntimeException: java.lang.String is not a valid external type for
  schema of int

Почему?Вы должны помнить, что искра ленива.Пока вы не собираете и не пишете данные, spark ничего не делает.Когда вы используете createDataFrame, ничего не происходит.Вот почему вы не получаете никаких ошибок.Когда вы печатаете схему, spark просто печатает предоставленную вами схему.Тем не менее, когда я вызываю show, я спрашиваю spark что-то сделать, и это вызывает все зависимые вычисления.

Проблема, которую вы видите, состоит в том, что spark ожидает int, но вы предоставляете строку.Spark не приводит ваши данные при создании кадра данных.У вас есть несколько возможностей решить вашу проблему.Одним из решений может быть забрасывание полей заранее так:

val rddOfRow = rdd
  .map(_.split(","))
  .map(_ match { case Array(a, b, c, d) => (a, b.toInt, c.toLong, d.toDouble) })
  .map(Row.fromTuple(_))
// and the rest of the code remains unchanged
...