Как преобразовать RDD [Array [Any]] в DataFrame? - PullRequest
1 голос
/ 17 октября 2019

У меня есть RDD [Array [Any]] следующим образом:

1556273771,Mumbai,1189193,1189198,0.56,-1,India,Australia,1571215104,1571215166
8374749403,London,1189193,1189198,0,1,India,England,4567362933,9374749392
7439430283,Dubai,1189193,1189198,0.76,-1,Pakistan,Sri Lanka,1576615684,4749383749

Мне нужно преобразовать это в кадр данных из 10 столбцов, но я новичок в Spark. Пожалуйста, дайте мне знать, как сделать это самым простым способом.

Я пытаюсь что-то похожее на этот код:

rdd_data.map{case Array(a,b,c,d,e,f,g,h,i,j) => (a,b,c,d,e,f,g,h,i,j)}.toDF()

Ответы [ 3 ]

2 голосов
/ 17 октября 2019

Когда вы создаете фрейм данных, Spark должен знать тип данных каждого столбца. Любой тип - это просто способ сказать, что вы не знаете тип переменной. Возможное решение - привести каждое значение к определенному типу. Это, конечно, не удастся, если указанное приведение неверно.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val rdd1 = spark.sparkContext.parallelize(
    Array(
        Array(1556273771L,"Mumbai",1189193,1189198 ,0.56,-1,"India",   "Australia",1571215104L,1571215166L),
        Array(8374749403L,"London",1189193,1189198 ,0   , 1,"India",   "England",  4567362933L,9374749392L),
        Array(7439430283L,"Dubai" ,1189193,1189198 ,0.76,-1,"Pakistan","Sri Lanka",1576615684L,4749383749L)
    ),1)
//rdd1: org.apache.spark.rdd.RDD[Array[Any]]

val rdd2 = rdd1.map(r => Row(
    r(0).toString.toLong, 
    r(1).toString, 
    r(2).toString.toInt, 
    r(3).toString.toInt, 
    r(4).toString.toDouble, 
    r(5).toString.toInt, 
    r(6).toString, 
    r(7).toString, 
    r(8).toString.toLong, 
    r(9).toString.toLong
))


val schema = StructType(
List(
    StructField("col0", LongType, false),
    StructField("col1", StringType, false),
    StructField("col2", IntegerType, false),
    StructField("col3", IntegerType, false),
    StructField("col4", DoubleType, false),
    StructField("col5", IntegerType, false),
    StructField("col6", StringType, false),
    StructField("col7", StringType, false),
    StructField("col8", LongType, false),
    StructField("col9", LongType, false)
  ) 
)

val df = spark.createDataFrame(rdd2, schema)

df.show
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
|      col0|  col1|   col2|   col3|col4|col5|    col6|     col7|      col8|      col9|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56|  -1|   India|Australia|1571215104|1571215166|
|8374749403|London|1189193|1189198| 0.0|   1|   India|  England|4567362933|9374749392|
|7439430283| Dubai|1189193|1189198|0.76|  -1|Pakistan|Sri Lanka|1576615684|4749383749|
+----------+------+-------+-------+----+----+--------+---------+----------+----------+

df.printSchema
root
 |-- col0: long (nullable = false)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)
 |-- col3: integer (nullable = false)
 |-- col4: double (nullable = false)
 |-- col5: integer (nullable = false)
 |-- col6: string (nullable = false)
 |-- col7: string (nullable = false)
 |-- col8: long (nullable = false)
 |-- col9: long (nullable = false)

Надеюсь, это поможет

1 голос
/ 17 октября 2019

Как упоминалось в других публикациях, для DataFrame требуются явные типы для каждого столбца, поэтому вы не можете использовать Any. Самым простым способом, который я могу придумать, было бы превратить каждую строку в кортеж правильных типов, а затем использовать неявное создание DF для преобразования в DataFrame. Вы были довольно близки в своем коде, вам просто нужно привести элементы к приемлемому типу.

В основном toDF знает, как преобразовать кортежи (с принятыми типами) в строку DF, и вы можете передавать имена столбцов в вызов toDF.

Например:

val data = Array(1556273771, "Mumbai", 1189193, 1189198, 0.56, -1, "India,Australia", 1571215104, 1571215166)
val rdd = sc.parallelize(Seq(data))

val df = rdd.map {
    case Array(a,b,c,d,e,f,g,h,i) => (
        a.asInstanceOf[Int],
        b.asInstanceOf[String],
        c.asInstanceOf[Int],
        d.asInstanceOf[Int],
        e.toString.toDouble,
        f.asInstanceOf[Int],
        g.asInstanceOf[String],
        h.asInstanceOf[Int],
        i.asInstanceOf[Int]
    )
}.toDF("int1", "city", "int2", "int3", "float1", "int4", "country", "int5", "int6")

df.printSchema
df.show(100, false)


scala> df.printSchema
root
 |-- int1: integer (nullable = false)
 |-- city: string (nullable = true)
 |-- int2: integer (nullable = false)
 |-- int3: integer (nullable = false)
 |-- float1: double (nullable = false)
 |-- int4: integer (nullable = false)
 |-- country: string (nullable = true)
 |-- int5: integer (nullable = false)
 |-- int6: integer (nullable = false)


scala> df.show(100, false)
+----------+------+-------+-------+------+----+---------------+----------+----------+
|int1      |city  |int2   |int3   |float1|int4|country        |int5      |int6      |
+----------+------+-------+-------+------+----+---------------+----------+----------+
|1556273771|Mumbai|1189193|1189198|0.56  |-1  |India,Australia|1571215104|1571215166|
+----------+------+-------+-------+------+----+---------------+----------+----------+

Редактировать для 0 -> Double:

Как указывал Андре, если вы начнете с 0 как Any, то это будет целое число java, а не scalaInt, и, следовательно, не может быть преобразовано в Scala Double. Преобразование в строку сначала позволяет вам затем преобразовать ее в двойное число.

0 голосов
/ 17 октября 2019

Вы можете попробовать подход ниже, это немного сложно, но не заморачиваясь со схемой. Сопоставьте Any с String, используя toDF(), создайте DataFrame массивов, затем создайте новые столбцы, выбрав каждый элемент из столбца массива.

  val rdd: RDD[Array[Any]] = spark.range(5).rdd.map(s => Array(s,s+1,s%2))
  val size = rdd.first().length

  def splitCol(col: Column): Seq[(String, Column)] = {
    (for (i <- 0 to size - 1) yield ("_" + i, col(i)))
  }

  import spark.implicits._

  rdd.map(s=>s.map(s=>s.toString()))
    .toDF("x")
    .select(splitCol('x).map(_._2):_*)
    .toDF(splitCol('x).map(_._1):_*)
    .show()

+---+---+---+
| _0| _1| _2|
+---+---+---+
|  0|  1|  0|
|  1|  2|  1|
|  2|  3|  0|
|  3|  4|  1|
|  4|  5|  0|
+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...