поддержание порядка collect_list (sql / spark scala) - PullRequest
0 голосов
/ 27 сентября 2019

У меня есть такая таблица:

Clients   City   Timestamp
1         NY        0
1         WDC       10
1         NY        11    
2         NY        20
2         WDC       15

В качестве выхода я хочу собрать все города на основе отметки времени (каждая отметка времени имеет уникальный город для пользователя).Но без отображения метки времени .Финальный список должен содержать только города по порядку.Так, для этого примера, это дает что-то вроде этого:

Clients   my_list   Timestamp
1         NY - WDC - NY
2         WDC - NY

Может быть, я должен сгенерировать список, используя метку времени.Затем удалите метку времени из этого списка.Я не знаю ...

Я использую Spark SQL с Scala.Итак, я попытался использовать collect_list как в sql, так и в scala, но похоже, что мы теряем порядок после его использования.

Можете ли вы помочь мне решить эту проблему?

Ответы [ 2 ]

0 голосов
/ 27 сентября 2019

Я бы просто сделал следующее:

val a = Seq((1,"NY",0),(1,"WDC",10),(1,"NY",11),(2,"NY",20),(2,"WDC",15))
    .toDF("client", "city", "timestamp")

val w = Window.partitionBy($"client").orderBy($"timestamp")
val b = a.withColumn("sorted_list", collect_list($"timestamp").over(w))

Здесь я использовал Window для разбиения по клиенту и упорядочил по отметке времени. На данный момент у вас есть такой фрейм данных:

+------+----+---------+-----------+
|client|city|timestamp|sorted_list|
+------+----+---------+-----------+
|1     |NY  |0        |[0]        |
|1     |WDC |10       |[0, 10]    |
|1     |NY  |11       |[0, 10, 11]|
|2     |WDC |15       |[15]       |
|2     |NY  |20       |[15, 20]   |
+------+----+---------+-----------+

Здесь вы создали новый столбец sorted_list имеет упорядоченный список значений, отсортированный по отметке времени, но у вас есть дублированные строки для каждого клиента.Чтобы удалить дубликаты, groupBy клиент и оставьте максимальное значение для каждой группы:

val c = b
        .groupBy($"client")
        .agg(max($"sorted_list").alias("sorted_timestamp"))
.show(false)

+------+----------------+
|client|sorted_timestamp|
+------+----------------+
|1     |[0, 10, 11]     |
|2     |[15, 20]        |
+------+----------------+

0 голосов
/ 27 сентября 2019
# below can be helpful for you to achieve your target
val input_rdd = spark.sparkContext.parallelize(List(("1","NY","0"),("1","WDC","10"),("1","NY","11"),("2","NY","20"),("2","WDC","15")))
val input_df = input_rdd.toDF("clients","city","Timestamp")
val winspec1 = Window.partitionBy($"clients").orderBy($"Timestamp")
val input_df1 = input_df.withColumn("collect", collect_list($"city").over(winspec1))
input_df1.show
Output:
+-------+----+---------+-------------+
|clients|city|Timestamp|      collect|
+-------+----+---------+-------------+
|      1|  NY|        0|         [NY]|
|      1| WDC|       10|    [NY, WDC]|
|      1|  NY|       11|[NY, WDC, NY]|
|      2| WDC|       15|        [WDC]|
|      2|  NY|       20|    [WDC, NY]|
+-------+----+---------+-------------+

val winspec2 = Window.partitionBy($"clients").orderBy($"Timestamp".desc)
input_df1.withColumn("number", row_number().over(winspec2)).filter($"number" === 1).drop($"number").drop($"Timestamp").drop($"city").show
Output:
+-------+-------------+
|clients|      collect|
+-------+-------------+
|      1|[NY, WDC, NY]|
|      2|    [WDC, NY]|
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...