Я могу придумать два пути, но я тоже не доволен.
Во-первых, определите ваши агрегаты как список Column
s. Раздражает то, что для удовлетворения сигнатуры метода необходимо добавить фиктивный столбец, а затем удалить его после агрегирования:
scala> val in = spark.read.option("header", true).csv("""_id,Type,Value,OType,DateTime
| 0,a,b,c,d
| 1,aaa,bbb,ccc,ddd""".split("\n").toSeq.toDS)
in: org.apache.spark.sql.DataFrame = [_id: string, Type: string ... 3 more fields]
scala> in.show
+---+----+-----+-----+--------+
|_id|Type|Value|OType|DateTime|
+---+----+-----+-----+--------+
| 0| a| b| c| d|
| 1| aaa| bbb| ccc| ddd|
+---+----+-----+-----+--------+
scala> val aggColumns = Seq("Value", "OType", "DateTime").map{c => first(c).alias(c)}
aggColumns: Seq[org.apache.spark.sql.Column] = List(first(Value, false) AS `Value`, first(OType, false) AS `OType`, first(DateTime, false) AS `DateTime`)
scala> val df_intermediate = in.groupBy("_id").pivot("Type").agg(lit("dummy"), aggColumns : _*)
df_intermediate: org.apache.spark.sql.DataFrame = [_id: string, a_dummy: string ... 7 more fields]
scala> df_intermediate.show
+---+-------+-------+-------+----------+---------+---------+---------+------------+
|_id|a_dummy|a_Value|a_OType|a_DateTime|aaa_dummy|aaa_Value|aaa_OType|aaa_DateTime|
+---+-------+-------+-------+----------+---------+---------+---------+------------+
| 0| dummy| b| c| d| dummy| null| null| null|
| 1| dummy| null| null| null| dummy| bbb| ccc| ddd|
+---+-------+-------+-------+----------+---------+---------+---------+------------+
scala> val df_final = df_intermediate.drop(df_intermediate.schema.collect{case c if c.name.endsWith("_dummy") => c.name} : _*)
df_final: org.apache.spark.sql.DataFrame = [_id: string, a_Value: string ... 5 more fields]
scala> df_final.show
+---+-------+-------+----------+---------+---------+------------+
|_id|a_Value|a_OType|a_DateTime|aaa_Value|aaa_OType|aaa_DateTime|
+---+-------+-------+----------+---------+---------+------------+
| 0| b| c| d| null| null| null|
| 1| null| null| null| bbb| ccc| ddd|
+---+-------+-------+----------+---------+---------+------------+
Во втором продолжите использовать Map
выражений agg, затем используйте регулярное выражение, чтобы найти переименованные столбцы и изменить их обратно:
scala> val aggExprs = Map(("OType" -> "first"), ("Value" -> "first"), "DateTime" -> "first")
aggExprs: scala.collection.immutable.Map[String,String] = Map(OType -> first, Value -> first, DateTime -> first)
scala> val df_intermediate = in.groupBy("_id").pivot("Type").agg(aggExprs)
df_intermediate: org.apache.spark.sql.DataFrame = [_id: string, a_first(OType, false): string ... 5 more fields]
scala> df_intermediate.show
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+
|_id|a_first(OType, false)|a_first(Value, false)|a_first(DateTime, false)|aaa_first(OType, false)|aaa_first(Value, false)|aaa_first(DateTime, false)|
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+
| 0| c| b| d| null| null| null|
| 1| null| null| null| ccc| bbb| ddd|
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+
scala> val regex = "^(.*)_first\\((.*), false\\)$".r
regex: scala.util.matching.Regex = ^(.*)_first\((.*), false\)$
scala> val replacements = df_intermediate.schema.collect{ case c if regex.findFirstMatchIn(c.name).isDefined =>
| val regex(pivotVal, colName) = c.name
| c.name -> s"${pivotVal}_$colName"
| }.toMap
replacements: scala.collection.immutable.Map[String,String] = Map(a_first(DateTime, false) -> a_DateTime, aaa_first(DateTime, false) -> aaa_DateTime, aaa_first(OType, false) -> aaa_OType, a_first(Value, false) -> a_Value, a_first(OType, false) -> a_OType, aaa_first(Value, false) -> aaa_Value)
scala> val df_final = replacements.foldLeft(df_intermediate){(df, c) => df.withColumnRenamed(c._1, c._2)}
df_final: org.apache.spark.sql.DataFrame = [_id: string, a_OType: string ... 5 more fields]
scala> df_final.show
+---+-------+-------+----------+---------+---------+------------+
|_id|a_OType|a_Value|a_DateTime|aaa_OType|aaa_Value|aaa_DateTime|
+---+-------+-------+----------+---------+---------+------------+
| 0| c| b| d| null| null| null|
| 1| null| null| null| ccc| bbb| ddd|
+---+-------+-------+----------+---------+---------+------------+
Сделайте свой выбор, но оба включают несколько ненужных шагов.