создание кадра данных Spark из последовательности карт - PullRequest
0 голосов
/ 04 сентября 2018

У меня есть последовательность карт. Каждая карта содержит имена столбцов в качестве ключей и значения столбцов в качестве значений. Итак, одна карта описывает одну строку. Я не знаю, сколько записей будет на карте. Поэтому я не могу создать кортеж фиксированной длины в своем коде. Я хочу преобразовать последовательность в фрейм данных. Я попробовал следующий код:

val mapRDD= sc.parallelize(Seq(
  Map("col1" -> "10", "col2" -> "Rohan", "col3" -> "201"),
  Map("col1" -> "13", "col2" -> "Ross", "col3" -> "201")
  ))

val columns=mapRDD.take(1).flatMap(a=>a.keys)

val resultantDF=mapRDD.map{value=> // Exception is thrown from this block
  value.values.toList 
}.toDF(columns:_*)

resultantDF.show()

Но он дал следующее исключение:

org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType 
java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:155)
...

Я пробовал несколько других подходов, но ничего не получалось.

1 Ответ

0 голосов
/ 04 сентября 2018

Вы можете попробовать следующий подход.

  1. Извлечение column имен и создание ниже dataframe из заданного rdd

    val columns=mapRDD.take(1).flatMap(a=>a.keys).toSeq
    val df=mapRDD.map(_.values.toList).toDF
    
    //df look like below
    +----------------+
    |           value|
    +----------------+
    |[10, Rohan, 201]|
    | [13, Ross, 201]|
    +----------------+
    
  2. Теперь создайте schema динамически и пользовательскую функцию, как показано ниже

    //Required imports
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Row
    import scala.collection.mutable.WrappedArray
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StructType
    
    //Creating udf the will return a Row of your schema
    def getRow(arr:WrappedArray[String]):Row=Row.fromSeq(arr.toSeq)
    
    //Creating schema
    val udfSchema=StructType(columns.map(x=>StructField(x,StringType,true)))
    
    //Registering udf along with schema
    val getRowUDF=udf(getRow _,udfSchema)
    
    //Now calling udf and generating one new column
    val df2=df.withColumn("temp",getRowUDF(df.col("value")))
    
    //df2 will look like
    +----------------+--------------+
    |           value|          temp|
    +----------------+--------------+
    |[10, Rohan, 201]|[10,Rohan,201]|
    | [13, Ross, 201]| [13,Ross,201]|
    +----------------+--------------+
    
  3. Теперь получите ваш окончательный dataframe от df2, используя ваш column list

    val query=columns.map(x=>df2.col("temp."+x))
    df2.select(query:_*).show
    
    //output
    +----+-----+----+
    |col1| col2|col3|
    +----+-----+----+
    |  10|Rohan| 201|
    |  13| Ross| 201|
    +----+-----+----+
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...