Как создать Spark SQL Dataframe со списком объектов Map - PullRequest
0 голосов
/ 17 февраля 2019

У меня есть несколько Map[String, String] в List (Scala).Например:

map1 = Map("EMP_NAME" -> “Ahmad”, "DOB" -> “01-10-1991”, "CITY" -> “Dubai”)
map2 = Map("EMP_NAME" -> “Rahul”, "DOB" -> “06-12-1991”, "CITY" -> “Mumbai”)
map3 = Map("EMP_NAME" -> “John”, "DOB" -> “11-04-1996”, "CITY" -> “Toronto”)
list = List(map1, map2, map3)

Теперь я хочу создать один кадр данных с чем-то вроде этого:

EMP_NAME    DOB             CITY
Ahmad       01-10-1991      Dubai
Rahul       06-12-1991      Mumbai
John        11-04-1996      Toronto

Как мне этого добиться?

Ответы [ 3 ]

0 голосов
/ 17 февраля 2019

Немного менее конкретный подход, например:

val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
val map2 = Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")
///...
val list = List(map1, map2) // map3, ...
val RDDmap = sc.parallelize(list)

// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)

// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
                     val list=value.values.toList
                     (list(0), list(1), list(2))
       }.toDF(cols:_*) // dynamic column names assigned

df.show(false)

возвращает:

+--------+----------+------+
|EMP_NAME|DOB       |CITY  |
+--------+----------+------+
|Ahmad   |01-10-1991|Dubai |
|John    |01-10-1992|Mumbai|
+--------+----------+------+

или чтобы ответить на ваш подвопрос, вот так - по крайней мере, я думаю, что это то, что выспрашивают, но, вероятно, нет:

val RDDmap = sc.parallelize(List(
   Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai"),
   Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")))
   ...

// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)

// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
                 val list=value.values.toList
                 (list(0), list(1), list(2))
       }.toDF(cols:_*) // dynamic column names assigned

Конечно, вы можете динамически создавать список, но вам все равно нужно назначить элементы Map.См. Добавление данных в список или любую другую коллекцию динамически в scala .Я просто прочитал бы из файла и покончил бы с этим.

0 голосов
/ 18 февраля 2019
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object DataFrameTest2 extends Serializable {
  var sparkSession: SparkSession = _
  var sparkContext: SparkContext = _
  var sqlContext: SQLContext = _

  def main(args: Array[String]): Unit = {
    sparkSession = SparkSession.builder().appName("TestMaster").master("local").getOrCreate()
    sparkContext = sparkSession.sparkContext

    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

    val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
    val map2 = Map("EMP_NAME" -> "Rahul", "DOB" -> "06-12-1991", "CITY" -> "Mumbai")
    val map3 = Map("EMP_NAME" -> "John", "DOB" -> "11-04-1996", "CITY" -> "Toronto")
    val list = List(map1, map2, map3)

    //create your rows
    val rows = list.map(m => Row(m.values.toSeq:_*))

    //create the schema from the header
    val header = list.head.keys.toList
    val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))

    //create your rdd
    val rdd = sparkContext.parallelize(rows)

    //create your dataframe using rdd
    val df = sparkSession.createDataFrame(rdd, schema)
    df.show()
  }
}
0 голосов
/ 17 февраля 2019

вы можете сделать это так:

import spark.implicits._

val df = list
  .map( m => (m.get("EMP_NAME"),m.get("DOB"),m.get("CITY")))
  .toDF("EMP_NAME","DOB","CITY")

df.show()

+--------+----------+-------+
|EMP_NAME|       DOB|   CITY|
+--------+----------+-------+
|   Ahmad|01-10-1991|  Dubai|
|   Rahul|06-12-1991| Mumbai|
|    John|11-04-1996|Toronto|
+--------+----------+-------+
...