Как добавить пустые столбцы в структуру сложного массива в Spark с помощью udf - PullRequest
2 голосов
/ 09 июля 2019

Я пытаюсь добавить пустые столбцы в столбец embebed array [struct], таким образом я смогу преобразовать похожий сложный столбец:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

Цель состоит в том, чтобы добавить к Элементу. Дополнительно дополнительное поле с именем extra2, по этой причине я сопоставляю это поле с UDF, но оно не выполняется, потому что:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

Если я печатаю схему для поля «Элементы», то отображается:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

И я пытаюсь преобразовать в эту схему:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

Ответы [ 2 ]

2 голосов
/ 15 июля 2019

Вот еще один подход, использующий наборы данных вместо фреймов данных, чтобы получить прямой доступ к объектам вместо использования Row.Существует еще один метод с именем asElement2, который преобразует Element в Element2.

case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional2: Additional2)

case class Additional(id: String, item_value: String)
case class Element(income:String, currency:String, additional: Additional){
  def asElement2(): Element2 ={
    val additional2 = Additional2(additional.id, additional.item_value, null)
    Element2(income, currency, additional2)
  }
}

val df = Seq(
  (Seq(Element("150000", "EUR", Additional("001", "500EUR")))),
  (Seq(Element("50000", "CHF", Additional("002", "1000CHF"))))
).toDS()

df.map{
  se => se.map{_.asElement2} 
}

//or even simpler
df.map{_.map{_.asElement2}}

Выход:

+-------------------------------+
|value                          |
+-------------------------------+
|[[150000, EUR, [001, 500EUR,]]]|
|[[50000, CHF, [002, 1000CHF,]]]|
+-------------------------------+

Конечная схема:

root
 |-- value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional2: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)
2 голосов
/ 12 июля 2019

Проще просто выполнить необходимые преобразования вложенных элементов строки в DataFrame с помощью map и переименовать столбец с помощью toDF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class Additional(id: String, item_value: String)
case class Element(income: String, currency: String, additional: Additional)
case class Additional2(id: String, item_value: String, extra2: String)
case class Element2(income: String, currency: String, additional: Additional2)

val df = Seq(
  (Seq(Element("70k", "US", Additional("1", "101")), Element("90k", "US", Additional("2", "202")))),
  (Seq(Element("80k", "US", Additional("3", "303"))))
).toDF("myElements")

val df2 = df.map{ case Row(s: Seq[Row] @unchecked) => s.map{
  case Row(income: String, currency: String, additional: Row) => additional match {
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  }}
}.toDF("myElements")

df2.show(false)
// +--------------------------------------------+
// |myElements                                  |
// +--------------------------------------------+
// |[[70k, US, [1, 101,]], [90k, US, [2, 202,]]]|
// |[[80k, US, [3, 303,]]]                      |
// +--------------------------------------------+

df2.printSchema
// root
//  |-- myElements: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- income: string (nullable = true)
//  |    |    |-- currency: string (nullable = true)
//  |    |    |-- additional: struct (nullable = true)
//  |    |    |    |-- id: string (nullable = true)
//  |    |    |    |-- item_value: string (nullable = true)
//  |    |    |    |-- extra2: string (nullable = true)

Если по какой-то причине предпочтительным является UDF,необходимые преобразования по сути одинаковы:

val  myUDF = udf((s: Seq[Row]) => s.map{
  case Row(income: String, currency: String, additional: Row) => additional match {
    case Row(id: String, item_value: String) =>
      Element2(income, currency, Additional2(id, item_value, null))
  }
})

val df2 = df.select(myUDF($"myElements").as("myElements"))
...