Это можно сделать, написав пользовательскую функцию для объединения нескольких Seq
в один Seq
. Вот способ получить желаемый результат:
Создание входного фрейма данных: Хотя тип данных поля CONFIRMATION_NUMBER
не упоминается в схеме, я принял его за целое число.
import spark.implicits._
val df = Seq(("Stine", "Rocha", Seq(48978451), Seq("Xavier.Vich@gmail"), Seq("MA"), Seq("01545-1300")),
("Aurora", "Markusson", Seq(26341542),Seq(),Seq("AR"),Seq("72716")),
("Stine", "Rocha", Seq(29828771),Seq("Xavier.Vich@gmail"),Seq("OH"), Seq("45101-9613")),
("Aubrey", "Fagerland",Seq(24572991),Seq("Aubrey.Fagerland"),Seq(), Seq())).
toDF("FIRST_NAME", "LAST_NAME", "CONFIRMATION_NUMBER", "SEGMENT_EMAIL", "SEGMENT_ADDRESS_STATE", "SEGMENT_ADDRESS_POSTAL_CODE")
Агрегированные столбцы: Теперь примените агрегацию к нужным столбцам, чтобы получить Seq
из Seq
. Вот код для этого:
import org.apache.spark.sql.functions.collect_list
val df1 = df.groupBy("FIRST_NAME", "LAST_NAME").
agg(collect_list("CONFIRMATION_NUMBER").as("cnlist"),
collect_list("SEGMENT_EMAIL").as("selist"),
collect_list("SEGMENT_ADDRESS_STATE").as("saslist"),
collect_list("SEGMENT_ADDRESS_POSTAL_CODE").as("sapclist"))
Вот вывод df1
:
+----------+---------+------------------------+------------------------------------------+------------+----------------------------+
|FIRST_NAME|LAST_NAME|cnlist |selist |saslist |sapclist |
+----------+---------+------------------------+------------------------------------------+------------+----------------------------+
|Stine |Rocha |[[48978451], [29828771]]|[[Xavier.Vich@gmail], [Xavier.Vich@gmail]]|[[MA], [OH]]|[[01545-1300], [45101-9613]]|
|Aurora |Markusson|[[26341542]] |[[]] |[[AR]] |[[72716]] |
|Aubrey |Fagerland|[[24572991]] |[[Aubrey.Fagerland]] |[[]] |[[]] |
+----------+---------+------------------------+------------------------------------------+------------+----------------------------+
Apply udf: Теперь примените пользовательскую функцию (udf), чтобы объединить массив массива в один массив. Я написал два udf для целочисленного и строкового типа данных.
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val concat_nested_string_seq:UserDefinedFunction = udf((seq_values:Seq[Seq[String]]) => {
var output_seq:Seq[String] = Seq()
seq_values.foreach(output_seq ++= _)
(output_seq)
})
val concat_nested_integer_seq:UserDefinedFunction = udf((seq_values:Seq[Seq[Integer]]) => {
var output_seq:Seq[Integer] = Seq()
seq_values.foreach(output_seq ++= _)
(output_seq)
})
val output_df = df1.withColumn("CONFIRMATION_NUMBER", concat_nested_integer_seq($"cnlist")).
withColumn("SEGMENT_EMAIL", concat_nested_string_seq($"selist")).
withColumn("SEGMENT_ADDRESS_STATE", concat_nested_string_seq($"saslist")).
withColumn("SEGMENT_ADDRESS_POSTAL_CODE", concat_nested_string_seq($"sapclist")).
drop("cnlist", "selist", "saslist", "sapclist")
Фрейм данных output_df
показывает желаемый результат. Это также может быть решено путем выравнивания столбцов типа данных массива и последующего агрегирования по столбцам. Но это может быть дорогостоящей операцией.