Предполагая, что у меня есть следующий DataFrame:
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 3|null| 11|
| 2| null| 2| xxx| 22|
| 1| null| 1| yyy|null|
| 2| null| 7|null| 33|
| 1| null| 12|null|null|
| 2| null| 19|null| 77|
| 1| null| 10| s13|null|
| 2| null| 11| a23|null|
+---+--------+---+----+----+
вот тот же пример DF с комментариями, отсортированный по grp
и ord
:
scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 1| yyy|null|
| 1| null| 3|null| 11| # grp:1 - last value for `col2` (11)
| 1| null| 10| s13|null| # grp:1 - last value for `col1` (s13)
| 1| null| 12|null|null| # grp:1 - last values for `null_col`, `ord`
| 2| null| 2| xxx| 22|
| 2| null| 7|null| 33|
| 2| null| 11| a23|null| # grp:2 - last value for `col1` (a23)
| 2| null| 19|null| 77| # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+
Я бы хотел сжать его. То есть сгруппировать его по столбцу "grp"
и для каждой группы отсортировать строки по столбцу "ord"
и взять последнее значение not null
в каждом столбце (если оно есть).
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 12| s13| 11|
| 2| null| 19| a23| 77|
+---+--------+---+----+----+
Я видел следующие похожие вопросы:
но мой настоящий DataFrame имеет более 250 столбцов, поэтому мне нужно решение, в котором мне не нужно явно указывать все столбцы.
Я не могу обернуть голову вокруг этого ...
MCVE: как создать образец DataFrame:
- создайте локальный файл "/tmp/data.txt", скопируйте и вставьте туда контекст DataFrame (как он опубликован выше)
- определение функция
readSparkOutput()
:
синтаксический анализ "/tmp/data.txt" в DataFrame:
val df = readSparkOutput("file:///tmp/data.txt")
ОБНОВЛЕНИЕ: Я думаю, что это должно быть похоже на следующий SQL:
SELECT
grp, ord, null_col, col1, col2
FROM (
SELECT
grp,
ord,
FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
FROM table_name) as v
WHERE v.rn = 1;
как мы можем динамически генерировать такой запрос Spark?
Я попробовал следующий упрощенный подход:
import org.apache.spark.sql.expressions.Window
val win = Window
.partitionBy("grp")
.orderBy($"ord".desc)
val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))
, который производит:
scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)
но я не смог передать его df.select
:
scala> df.select(cols.head, cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
df.select(cols.head, cols.tail: _*).show
еще одна попытка:
scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
found : String => org.apache.spark.sql.Column
required: org.apache.spark.sql.Column => ?
df.select(cols.map(col): _*).show