Spark Dataframe - Как сохранить только последнюю запись для каждой группы на основе идентификатора и даты? - PullRequest
0 голосов
/ 23 января 2020

У меня есть фрейм данных:

DF:

1,2016-10-12 18:24:25
1,2016-11-18 14:47:05
2,2016-10-12 21:24:25
2,2016-10-12 20:24:25
2,2016-10-12 22:24:25
3,2016-10-12 17:24:25

Как сохранить только последнюю запись для каждой группы? (есть 3 группы выше (1,2,3)).

Результат должен быть:

1,2016-11-18 14:47:05
2,2016-10-12 22:24:25
3,2016-10-12 17:24:25

Попытка также сделать его эффективным (например, до финиша sh в течение нескольких коротких минут на умеренном кластере (100 миллионов записей), поэтому сортировку / упорядочение следует выполнять (если они требуются) наиболее эффективным и правильным способом.

Ответы [ 2 ]

0 голосов
/ 23 января 2020

Вы должны использовать оконную функцию.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark. sql .Window

Вы должны разделить окно по группам и OrderBy по времени. ниже скрипт pyspark делает работу

from pyspark.sql.functions import *
from pyspark.sql.window import Window

schema = "Group int,time timestamp "

df = spark.read.format('csv').schema(schema).options(header=False).load('/FileStore/tables/Group_window.txt')


w = Window.partitionBy('Group').orderBy(desc('time'))
df = df.withColumn('Rank',dense_rank().over(w))

df.filter(df.Rank == 1).drop(df.Rank).show()


+-----+-------------------+
|Group|               time|
+-----+-------------------+
|    1|2016-11-18 14:47:05|
|    3|2016-10-12 17:24:25|
|    2|2016-10-12 22:24:25|
+-----+-------------------+ ```





0 голосов
/ 23 января 2020

Вы можете использовать оконные функции, как описано здесь для случаев, подобных этому:

scala> val in = Seq((1,"2016-10-12 18:24:25"),
     | (1,"2016-11-18 14:47:05"),
     | (2,"2016-10-12 21:24:25"),
     | (2,"2016-10-12 20:24:25"),
     | (2,"2016-10-12 22:24:25"),
     | (3,"2016-10-12 17:24:25")).toDF("id", "ts")
in: org.apache.spark.sql.DataFrame = [id: int, ts: string]
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val win = Window.partitionBy("id").orderBy('ts desc)
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@59fa04f7
scala> in.withColumn("rank", row_number().over(win)).where('rank === 1).show(false)
+---+-------------------+----+
| id|                 ts|rank|
+---+-------------------+----+
|  1|2016-11-18 14:47:05|   1|
|  3|2016-10-12 17:24:25|   1|
|  2|2016-10-12 22:24:25|   1|
+---+-------------------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...