У меня есть RDD[HbaseRecord]
, который содержит пользовательский комплексный тип Name
. Оба класса определены ниже:
class HbaseRecord(
val uuid: String,
val timestamp: String,
val name: Name
)
class Name(
val firstName: String,
val middleName: String,
val lastName: String
)
В какой-то момент в моем коде я хочу сгенерировать DataFrame из этого RDD, чтобы я мог сохранить его в виде файла avro. Я попробовал следующее:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name : Name = hbaseRecord.name
Row(uuid, timestamp, name)
})
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
Но я получаю следующую ошибку:
scala.MatchError: (из класса java.lang.String) в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 255)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 260)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102)
в
org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.Apply (CatalystTypeConverters.scala: 401)
в
org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492)
в
org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492)
на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) на
scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) в
scala.collection.Iterator $$ anon $ 10.next (Iterator.scala: 312) в
scala.collection.Iterator $ class.foreach (Iterator.scala: 727) в
scala.collection.AbstractIterator.foreach (Iterator.scala: 1157) в
scala.collection.generic.Growable $ класса $ плюс $ плюс $ эк (Growable.scala: 48).
в
scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ экв (ArrayBuffer.scala: 103).
в
scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ эк (ArrayBuffer.scala: 47).
в
scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 273)
в scala.collection.AbstractIterator.to (Iterator.scala: 1157) в
scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala: 265)
в scala.collection.AbstractIterator.toBuffer (Iterator.scala: 1157)
в
scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 252)
в scala.collection.AbstractIterator.toArray (Iterator.scala: 1157) в
org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212)
в
org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212)
в
org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858)
в
org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858)
в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66)
в org.apache.spark.scheduler.Task.run (Task.scala: 89) в
org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 213)
в
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
в
java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617)
at java.lang.Thread.run (Thread.java:745)
Я попытался удалить комплексный тип из строки, поэтому он будет Row[String, String]
, и тогда ошибки не будет. Поэтому я предполагаю, что проблема со сложным типом.
Что я делаю не так? или какой другой подход я мог бы использовать для создания этого DataFrame со сложным типом?