Передача типа структуры в методы или UDFS в кадрах данных spark sql - PullRequest
0 голосов
/ 21 февраля 2020

У меня есть два фрейма данных, и я присоединился к ним, и после объединения в объединенный фрейм данных у меня есть два столбца типа struct. В основном они из Array [[String, Int]]. Мне нужно вывести третий столбец на основе элементов этого типа структуры.

Мой код выглядит следующим образом.

val bdf = Seq(
 ("a",1,1,10)
,("a",1,2,10)
,("a",1,3,10)
,("a",1,4,10)
,("b",1,1,20)
,("b",1,2,10)
,("a",2,3,10)
,("a",2,4,20)
,("a",2,5,20)
,("c",2,1,10)
,("c",2,2,20)
,("c",2,3,20)
).toDF("contract_number","linenumber","monthdel","open_quant")

val gbdf = bdf.withColumn("bmergedcol",struct(bdf("monthdel"),bdf("open_quant"))).groupBy("contract_number","linenumber").agg(collect_list("bmergedcol"))





val pl = Seq(
("a",1,"FLAT",10)
,("a",1,"FLAT",30)
,("a",1,"NFE",10)
,("b",1,"FLAT",10)
,("b",1,"NFE",10)
,("c",2,"NFE",10)
,("a",3,"NFE",20)
,("c",2,"FLAT",20)).toDF("connum","linnum","type","qnt")

import org.apache.spark.sql.functions._ 
val gpl = pl.withColumn("mergedcol",struct(pl("type"),pl("qnt"))).groupBy("connum","linnum").agg(collect_list("mergedcol"))


val jdf = gbdf.join(gpl,expr("((contract_number = connum) AND (linenumber = linnum ))"),"left_outer")

Мой вывод в формате jdf выглядит как

enter image description here

Мне нужно понять, как я могу передать два поля типа структуры в какой-либо метод и извлечь из него третье?

Ответы [ 2 ]

0 голосов
/ 21 февраля 2020

Оба массива структур должны вводить ваш UDF как Seq[Row], который затем можно отобразить в кортежи, указав типы структур (я думаю, что это строка int в вашем случае). В этом примере я использую сопоставление с образцом на Row, но есть и другие способы сделать это (например, используя Row#.getAs):

val myUDF = udf((arr1:Seq[Row],arr2:Seq[Row]) => {
  // convert to tuples
  val arr1Tup: Seq[(String, Int)] = arr1.map{case Row(s:String,i:Int) => (s,i)}
  val arr2Tup: Seq[(String, Int)] = arr2.map{case Row(s:String,i:Int) => (s,i)}
  // now do derive new quantities
})

Используя 2 последовательности кортежей, вы можете получить свой новый столбец

0 голосов
/ 21 февраля 2020

Пользовательские функции (также известные как UDF) - это функция Spark SQL для определения новых основанных на столбцах функций, преобразующих наборы данных. UDF может использоваться для передачи двух полей типа структуры для получения результата.

val customUdf = udf((col1: Seq[Row], col2: Int) => {
  // This is an example.
  col1(1).getAs[String]("type") + "--" + col2
})
val cdf = jdf.withColumn("custom", customUdf(jdf.col("collect_list(mergedcol)"), jdf.col("linnum")))
cdf.show(10)

В приведенном выше примере udf col1 - это Seq [Row], так как он является массивом типа структуры, если должен быть только тип структуры доступ, чем просто строка должна быть использована.

...