Вот общий подход для транспонирования DataFrame:
- Для каждого из основных столбцов (скажем,
c1
, c2
, c3
) объедините имя столбца и связанные столбцы значений в struct
(например, struct(lit(c1), c1_cnt, c1_wts)
)
- Поместите все эти
struct
-типированные столбцы в массив, который затем explode
-это в ряды struct
столбцов
- Группировка по названию сводного столбца для агрегирования связанных
struct
элементов
Следующий пример кода был обобщен для обработки произвольного списка столбцов для транспонирования:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("AAA", "VVVV", "SSSS", 3, 4, 5, 0.5, 0.4, 0.6),
("BBB", "BBBB", "TTTT", 3, 4, 5, 0.5, 0.4, 0.6),
("CCC", "DDDD", "YYYY", 3, 4, 5, 0.5, 0.4, 0.6)
).toDF("c1", "c2", "c3", "c1_cnt", "c2_cnt", "c3_cnt", "c1_wts", "c2_wts", "c3_wts")
val pivotCols = Seq("c1", "c2", "c3")
val valueColSfx = Seq("_cnt", "_wts")
val arrStructs = pivotCols.map{ c => struct(
Seq(lit(c).as("_pvt")) ++
valueColSfx.map((c, _)).map{ case (p, s) => col(p + s).as(s) }: _*
).as(c + "_struct")
}
val valueColAgg = valueColSfx.map(s => first($"struct_col.$s").as(s + "_first"))
df.
select(array(arrStructs: _*).as("arr_structs")).
withColumn("struct_col", explode($"arr_structs")).
groupBy($"struct_col._pvt").agg(valueColAgg.head, valueColAgg.tail: _*).
show
// +----+----------+----------+
// |_pvt|_cnt_first|_wts_first|
// +----+----------+----------+
// | c1| 3| 0.5|
// | c3| 5| 0.6|
// | c2| 4| 0.4|
// +----+----------+----------+
Обратите внимание, что функция first
используется в приведенном выше примере, но это может быть любая другая агрегатная функция (например, avg
, max
, collect_list
) в зависимости от конкретных бизнес-требований.