Как создать Spark DataFrame из RDD [Row], когда Row содержит сложные типы - PullRequest
0 голосов
/ 17 марта 2019

У меня есть RDD[HbaseRecord], который содержит пользовательский комплексный тип Name. Оба класса определены ниже:

class HbaseRecord(
      val uuid: String,
      val timestamp: String,
      val name: Name
)

class Name(    
    val firstName:                String,     
    val middleName:               String,       
    val lastName:                 String
)

В какой-то момент в моем коде я хочу сгенерировать DataFrame из этого RDD, чтобы я мог сохранить его в виде файла avro. Я попробовал следующее:

//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ... 

//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name : Name = hbaseRecord.name

      Row(uuid, timestamp, name)
    })

//Here I define the schema
   val schema = new StructType()
                  .add("uuid",StringType)
                  .add("timestamp", StringType)
                  .add("name", new StructType()
                                  .add("firstName",StringType)
                                  .add("middleName",StringType)
                                  .add("lastName",StringType)

//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)

Но я получаю следующую ошибку:

scala.MatchError: (из класса java.lang.String) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 255) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 260) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102) в org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.Apply (CatalystTypeConverters.scala: 401) в org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492) в org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492) на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) в scala.collection.Iterator $$ anon $ 10.next (Iterator.scala: 312) в scala.collection.Iterator $ class.foreach (Iterator.scala: 727) в scala.collection.AbstractIterator.foreach (Iterator.scala: 1157) в scala.collection.generic.Growable $ класса $ плюс $ плюс $ эк (Growable.scala: 48). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ экв (ArrayBuffer.scala: 103). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ эк (ArrayBuffer.scala: 47). в scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 273) в scala.collection.AbstractIterator.to (Iterator.scala: 1157) в scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala: 265) в scala.collection.AbstractIterator.toBuffer (Iterator.scala: 1157) в scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 252) в scala.collection.AbstractIterator.toArray (Iterator.scala: 1157) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) в org.apache.spark.scheduler.Task.run (Task.scala: 89) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 213) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745)

Я попытался удалить комплексный тип из строки, поэтому он будет Row[String, String], и тогда ошибки не будет. Поэтому я предполагаю, что проблема со сложным типом.

Что я делаю не так? или какой другой подход я мог бы использовать для создания этого DataFrame со сложным типом?

1 Ответ

1 голос
/ 17 марта 2019

Я просто использовал простой case class для этого вместо класса.Столбец name не соответствует определенной схеме.Преобразуйте столбец name в тип строки, и он должен работать.

val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name = Row(hbaseRecord.name.firstName,
                     hbaseRecord.name.middleName,hbaseRecord.name.lastName)
      Row(uuid, timestamp, name)
    })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...