Из RDD массива массивов в dataframe - PullRequest
0 голосов
/ 19 июня 2019

После выполнения некоторой операции я получил rdd (например, следующий) массива (любого), где все значения имеют тип Int ожидают, что 3,8 и 13 имеют тип string.

Array[Array[Any]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15))

Используйте следующий код для справки:

var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))

Теперь я пытаюсь создать фрейм данных из этого массива, используя класс case, в котором имя столбца и класс case следующие:

case class specialchar(alpha:Int,beta:Int,gamma:String,theta:Int,zeta:Int) 

Мне нужна помощь, как мы можем перебрать rdd-массив Array [Array [Any]] и сохранить его в dataframe.Заранее спасибо.

1 Ответ

0 голосов
/ 19 июня 2019

Udf для обработки Any.

def toInt(x: Any): Option[Int] = x match {
  case i: Int => Some(i)
  case _ => None
}

def toStr(x: Any): Option[String] = x match {
  case i: String => Some(i)
  case _ => None
}

Классы дел и преобразование Array в Df.

var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))
case class specialchar(alpha:Int,beta:Int,gamma:String,theta:Int,zeta:Int)  

var specialCharDf = Seq.empty[specialchar].toDF

exp.collect().foreach(x => {
    val a:Int = toInt(x(0)).getOrElse(1)
    val b:Int = toInt(x(1)).getOrElse(1)
    val c:String = toStr(x(2)).getOrElse("1")
    val d:Int = toInt(x(3)).getOrElse(1)
    val e:Int = toInt(x(4)).getOrElse(1)

    println(a, b, c, d, e)

    val specialcharTempDf =  Seq(specialchar(a,b,c,d,e)).toDF
    specialCharDf = specialcharTempDf.union(specialCharDf)
})

specialCharDf.printSchema() //follows schema desired.

РЕДАКТИРОВАТЬ РЕДАКТИРОВАТЬ РЕДАКТИРОВАТЬ - Ахил упомянул, что в конце они должны быть целыми числами.Новое решение ниже:

    var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))
    case class specialchar(alpha:Int,beta:Int,gamma:Int,theta:Int,zeta:Int)  

    var specialCharDf = Seq.empty[specialchar].toDF

exp.collect().foreach(x => {
    val a:Int = toInt(x(0)).getOrElse(1)
    val b:Int = toInt(x(1)).getOrElse(1)
    val c:String = toStr(x(2)).getOrElse("1")
    val f = c.toInt
    val d:Int = toInt(x(3)).getOrElse(1)
    val e:Int = toInt(x(4)).getOrElse(1)

    println(a, b, f, d, e)

    val specialcharTempDf =  Seq(specialchar(a,b,f,d,e)).toDF
    specialCharDf = specialcharTempDf.union(specialCharDf)
})

specialCharDf.printSchema() //follows schema desired.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...