Ваша спецификация окна верна. Используя фиктивный набор данных, давайте сначала загрузим вашу исходную таблицу Hive в DataFrame:
val df = spark.sql("""select * from table1""")
df.show
// +-------------+-------------------+
// |key_timestamp| datime|
// +-------------+-------------------+
// | 1|2018-06-01 00:00:00|
// | 1|2018-07-01 00:00:00|
// | 2|2018-05-01 00:00:00|
// | 2|2018-07-01 00:00:00|
// | 2|2018-06-01 00:00:00|
// +-------------+-------------------+
Чтобы применить функцию Window row_number
(через спецификацию Window) к DataFrame, используйте withColumn
, чтобы создать новый столбец для захвата результата функции:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val window = Window.partitionBy($"key_timestamp").orderBy($"datime".desc)
val resultDF = df.withColumn("rn", row_number.over(window))
resultDF.show
// +-------------+-------------------+---+
// |key_timestamp| datime| rn|
// +-------------+-------------------+---+
// | 1|2018-07-01 00:00:00| 1|
// | 1|2018-06-01 00:00:00| 2|
// | 2|2018-07-01 00:00:00| 1|
// | 2|2018-06-01 00:00:00| 2|
// | 2|2018-05-01 00:00:00| 3|
// +-------------+-------------------+---+
Чтобы проверить, запустите ваш SQL против table1
, и вы должны получить тот же результат:
spark.sql("""
select *, row_number() over
(partition by key_timestamp order by datime desc) rn
from table1
""").show