Как преобразовать столбцы в строки в Spark Scala или Spark SQL? - PullRequest
0 голосов
/ 26 апреля 2019

У меня есть такие данные.

+------+------+------+----------+----------+----------+----------+----------+----------+
| Col1 | Col2 | Col3 | Col1_cnt | Col2_cnt | Col3_cnt | Col1_wts | Col2_wts | Col3_wts |
+------+------+------+----------+----------+----------+----------+----------+----------+
| AAA  | VVVV | SSSS |        3 |        4 |        5 |      0.5 |      0.4 |      0.6 |
| BBB  | BBBB | TTTT |        3 |        4 |        5 |      0.5 |      0.4 |      0.6 |
| CCC  | DDDD | YYYY |        3 |        4 |        5 |      0.5 |      0.4 |      0.6 |
+------+------+------+----------+----------+----------+----------+----------+----------+

Я пытался, но здесь я не получаю никакой помощи.

val df = Seq(("G",Some(4),2,None),("H",None,4,Some(5))).toDF("A","X","Y", "Z")

Я хочу вывод в виде таблицы ниже

+-----------+---------+---------+
| Cols_name | Col_cnt | Col_wts |
+-----------+---------+---------+
| Col1      |       3 |     0.5 |
| Col2      |       4 |     0.4 |
| Col3      |       5 |     0.6 |
+-----------+---------+---------+

1 Ответ

0 голосов
/ 27 апреля 2019

Вот общий подход для транспонирования DataFrame:

  1. Для каждого из основных столбцов (скажем, c1, c2, c3) объедините имя столбца и связанные столбцы значений в struct (например, struct(lit(c1), c1_cnt, c1_wts))
  2. Поместите все эти struct -типированные столбцы в массив, который затем explode -это в ряды struct столбцов
  3. Группировка по названию сводного столбца для агрегирования связанных struct элементов

Следующий пример кода был обобщен для обработки произвольного списка столбцов для транспонирования:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("AAA", "VVVV", "SSSS", 3, 4, 5, 0.5, 0.4, 0.6),
  ("BBB", "BBBB", "TTTT", 3, 4, 5, 0.5, 0.4, 0.6),
  ("CCC", "DDDD", "YYYY", 3, 4, 5, 0.5, 0.4, 0.6)
).toDF("c1", "c2", "c3", "c1_cnt", "c2_cnt", "c3_cnt", "c1_wts", "c2_wts", "c3_wts")

val pivotCols = Seq("c1", "c2", "c3")

val valueColSfx = Seq("_cnt", "_wts")

val arrStructs = pivotCols.map{ c => struct(
    Seq(lit(c).as("_pvt")) ++
      valueColSfx.map((c, _)).map{ case (p, s) => col(p + s).as(s) }: _*
  ).as(c + "_struct")
}

val valueColAgg = valueColSfx.map(s => first($"struct_col.$s").as(s + "_first"))

df.
  select(array(arrStructs: _*).as("arr_structs")).
  withColumn("struct_col", explode($"arr_structs")).
  groupBy($"struct_col._pvt").agg(valueColAgg.head, valueColAgg.tail: _*).
  show
// +----+----------+----------+
// |_pvt|_cnt_first|_wts_first|
// +----+----------+----------+
// |  c1|         3|       0.5|
// |  c3|         5|       0.6|
// |  c2|         4|       0.4|
// +----+----------+----------+

Обратите внимание, что функция first используется в приведенном выше примере, но это может быть любая другая агрегатная функция (например, avg, max, collect_list) в зависимости от конкретных бизнес-требований.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...