Я бы просто сделал следующее:
val a = Seq((1,"NY",0),(1,"WDC",10),(1,"NY",11),(2,"NY",20),(2,"WDC",15))
.toDF("client", "city", "timestamp")
val w = Window.partitionBy($"client").orderBy($"timestamp")
val b = a.withColumn("sorted_list", collect_list($"timestamp").over(w))
Здесь я использовал Window
для разбиения по клиенту и упорядочил по отметке времени. На данный момент у вас есть такой фрейм данных:
+------+----+---------+-----------+
|client|city|timestamp|sorted_list|
+------+----+---------+-----------+
|1 |NY |0 |[0] |
|1 |WDC |10 |[0, 10] |
|1 |NY |11 |[0, 10, 11]|
|2 |WDC |15 |[15] |
|2 |NY |20 |[15, 20] |
+------+----+---------+-----------+
Здесь вы создали новый столбец sorted_list имеет упорядоченный список значений, отсортированный по отметке времени, но у вас есть дублированные строки для каждого клиента.Чтобы удалить дубликаты, groupBy
клиент и оставьте максимальное значение для каждой группы:
val c = b
.groupBy($"client")
.agg(max($"sorted_list").alias("sorted_timestamp"))
.show(false)
+------+----------------+
|client|sorted_timestamp|
+------+----------------+
|1 |[0, 10, 11] |
|2 |[15, 20] |
+------+----------------+