Scala программа поиска самых последних значений - PullRequest
0 голосов
/ 29 августа 2018

Я хочу создать DF, основанный на улье sql ниже:

WITH FILTERED_table1 AS (select *
, row_number() over (partition by key_timestamp order by datime DESC) rn
FROM table1)

scala function:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val table1 = Window.partitionBy($"key_timestamp").orderBy($"datime".desc)

Я посмотрел на оконную функцию, и это было то, что я могу придумать, я не уверен, как написать это в функции scala, так как я очень новичок в scala. Как вернуть DF из SQL использовать функции Scala? Любые предложения будут с благодарностью. :)

1 Ответ

0 голосов
/ 29 августа 2018

Ваша спецификация окна верна. Используя фиктивный набор данных, давайте сначала загрузим вашу исходную таблицу Hive в DataFrame:

val df = spark.sql("""select * from table1""")

df.show
// +-------------+-------------------+
// |key_timestamp|             datime|
// +-------------+-------------------+
// |            1|2018-06-01 00:00:00|
// |            1|2018-07-01 00:00:00|
// |            2|2018-05-01 00:00:00|
// |            2|2018-07-01 00:00:00|
// |            2|2018-06-01 00:00:00|
// +-------------+-------------------+

Чтобы применить функцию Window row_number (через спецификацию Window) к DataFrame, используйте withColumn, чтобы создать новый столбец для захвата результата функции:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy($"key_timestamp").orderBy($"datime".desc)

val resultDF = df.withColumn("rn", row_number.over(window))

resultDF.show
// +-------------+-------------------+---+
// |key_timestamp|             datime| rn|
// +-------------+-------------------+---+
// |            1|2018-07-01 00:00:00|  1|
// |            1|2018-06-01 00:00:00|  2|
// |            2|2018-07-01 00:00:00|  1|
// |            2|2018-06-01 00:00:00|  2|
// |            2|2018-05-01 00:00:00|  3|
// +-------------+-------------------+---+

Чтобы проверить, запустите ваш SQL против table1, и вы должны получить тот же результат:

spark.sql("""
    select *, row_number() over
      (partition by key_timestamp order by datime desc) rn
    from table1
  """).show
...