Как создать Spark DataFrame из RDD [Row], когда Row содержит Map [Map] - PullRequest
1 голос
/ 19 марта 2019

Этот вопрос является продолжением этого другого , где пользователь который дал правильный ответ, попросил новый вопрос, чтобы объяснить мои дальнейшие сомнения.

Я пытаюсь сгенерировать фрейм данных из RDD[Objects], где мои объекты имеют примитивные типы, но также и сложные типы. В предыдущих вопросах было объяснено, как анализировать карту сложного типа.

Далее я попытался экстраполировать данное решение для анализа карты [карты]. Таким образом, в DataFrame он преобразуется в массив (Map).

Ниже я приведу код, который я написал до сих пор:

//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ... 

//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD.map(
    hbaseRecord => {

        val uuid : String = hbaseRecord.uuid
        val timestamp : String = hbaseRecord.timestamp

        val name = Row(hbaseRecord.nameMap.firstName.getOrElse(""),
            hbaseRecord.nameMap.middleName.getOrElse(""),
            hbaseRecord.nameMap.lastName.getOrElse(""))

        val contactsMap = hbaseRecord.contactsMap 

        val homeContactMap = contactsMap.get("HOME")
        val homeContact = Row(homeContactMap.contactType,
            homeContactMap.areaCode,
            homeContactMap.number)

        val workContactMap = contactsMap.get("WORK")
        val workContact = Row(workContactMap.contactType,
            workContactMap.areaCode,
            workContactMap.number)

        val contacts = Row(homeContact,workContact)

        Row(uuid, timestamp, name, contacts)

    }
)


//Here I define the schema
   val schema = new StructType()
                    .add("uuid",StringType)
                    .add("timestamp", StringType)
                    .add("name", new StructType()
                            .add("firstName",StringType)
                            .add("middleName",StringType)
                            .add("lastName",StringType)
                    .add("contacts", new StructType(
                                   Array(
                                   StructField("contactType", StringType),
                                   StructField("areaCode", StringType),
                                   StructField("number", StringType)
                    )))  


//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)

Но я получаю следующую ошибку:

19/03/18 12:09:53 ОШИБКА executor.Executor: Исключение в задании 0.0 в Этап 1.0 (TID 8) scala.MatchError: [HOME, 05,12345678] (класса org.apache.spark.sql.catalyst.expressions.GenericRow) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl (CatalystTypeConverters.scala: 295) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl (CatalystTypeConverters.scala: 294) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 260) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 260) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 250) в org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 102) в org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.Apply (CatalystTypeConverters.scala: 401) в org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492) в org.apache.spark.sql.SQLContext $$ anonfun $ 6.apply (SQLContext.scala: 492) на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 328) в scala.collection.Iterator $$ anon $ 10.next (Iterator.scala: 312) в scala.collection.Iterator $ class.foreach (Iterator.scala: 727) в scala.collection.AbstractIterator.foreach (Iterator.scala: 1157) в scala.collection.generic.Growable $ класса $ плюс $ плюс $ эк (Growable.scala: 48). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ экв (ArrayBuffer.scala: 103). в scala.collection.mutable.ArrayBuffer $ плюс $ плюс $ эк (ArrayBuffer.scala: 47). в scala.collection.TraversableOnce $ class.to (TraversableOnce.scala: 273) в scala.collection.AbstractIterator.to (Iterator.scala: 1157) в scala.collection.TraversableOnce $ class.toBuffer (TraversableOnce.scala: 265) в scala.collection.AbstractIterator.toBuffer (Iterator.scala: 1157) в scala.collection.TraversableOnce $ class.toArray (TraversableOnce.scala: 252) в scala.collection.AbstractIterator.toArray (Iterator.scala: 1157) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.Apply (SparkPlan.scala: 212) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.Apply (SparkContext.scala: 1858) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) в org.apache.spark.scheduler.Task.run (Task.scala: 89) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 213) вjava.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745)

Я также попытался сгенерировать элемент контактов в виде массива:

val contacts = Array(homeContact,workContact)

Но тогда я получаю следующую ошибку:

scala.MatchError: [Lorg.apache.spark.sql.Row; @ 726c6aec (класса [Lorg.apache.spark.sql.Row;) * * тысяча двадцать-семь

Может кто-нибудь определить проблему?

1 Ответ

2 голосов
/ 19 марта 2019

Давайте упростим вашу ситуацию для вашего массива контактов.Вот в чем проблема.Вы пытаетесь использовать эту схему:

val schema = new StructType()
                .add("contacts", new StructType(
                               Array(
                               StructField("contactType", StringType),
                               StructField("areaCode", StringType),
                               StructField("number", StringType)
                )))

для хранения списка контактов, который является структурным типом.Тем не менее, эта схема не может содержать список, только один контакт.Мы можем проверить это с помощью:

spark.createDataFrame(sc.parallelize(Seq[Row]()), schema).printSchema
root
 |-- contacts: struct (nullable = true)
 |    |-- contactType: string (nullable = true)
 |    |-- areaCode: string (nullable = true)
 |    |-- number: string (nullable = true)

В самом деле, Array, который вы имеете в своем коде, предназначен для того, чтобы содержать поля вашего типа структуры "contacts".Вы хотите, тип существует: ArrayType.Это дает немного другой результат:

val schema_ok = new StructType()
    .add("contacts", ArrayType(new StructType(Array(
        StructField("contactType", StringType),
        StructField("areaCode", StringType),
        StructField("number", StringType)))))

spark.createDataFrame(sc.parallelize(Seq[Row]()), schema_ok).printSchema
root
 |-- contacts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- contactType: string (nullable = true)
 |    |    |-- areaCode: string (nullable = true)
 |    |    |-- number: string (nullable = true)

, и он работает:

val row = Row(Array(
                Row("type", "code", "number"), 
                Row("type2", "code2", "number2")))
spark.createDataFrame(sc.parallelize(Seq(row)), schema_ok).show(false)
+-------------------------------------------+
|contacts                                   |
+-------------------------------------------+
|[[type,code,number], [type2,code2,number2]]|
+-------------------------------------------+

Так что, если вы обновляете схему этой версией «контактов», просто замените val contacts = Row(homeContact,workContact) на val contacts = Array(homeContact,workContact) и это должно работать.

Примечание: если вы хотите пометить свои контакты (HOME или WORK), существует также тип MapType.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...