У меня есть кадр данных Spark, который выглядит примерно так:
id country date action
1 A 2019-01-01 suppress
1 A 2019-01-02 suppress
2 A 2019-01-03 bid-up
2 A 2019-01-04 bid-down
3 C 2019-01-01 no-action
3 C 2019-01-02 bid-up
4 D 2019-01-01 suppress
Я хочу уменьшить этот кадр данных, сгруппировав его по id, стране и собрав уникальные значения столбца 'action' в массив, но этот массив должен быть упорядочен по столбцу даты.
Например,
id country action_arr
1 A [suppress]
2 A [bid-up, bid-down]
3 C [no-action, bid-up]
4 D [suppress]
Чтобы объяснить это немного более кратко, у меня есть некоторый код SQL (presto), который делает именно то, что я хочу. .. я просто пытаюсь сделать это в PySpark или SparkSQL:
SELECT id, country, array_distinct(array_agg(action ORDER BY date ASC)) AS actions
FROM table
GROUP BY id, country
Теперь вот моя попытка в PySpark:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('action').orderBy('date')
sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))
Тогда я хочу узнать количество случаевкаждого набора действий по группам:
df = sorted_list_df.select('country', 'sorted_list').groupBy('coutry', 'sorted_list').agg(F.count('sorted_list'))
Код выполняется, но в выходном столбце sorted_list он в основном совпадает с действием без агрегации массива. Может ли кто-нибудь помочь?
ПРАВИТЬМне удалось получить то, что я хочу ... но результаты не полностью соответствуют результатам до. Кто-нибудь может объяснить, почему? Решение ниже:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('action').orderBy('date')
df_2 = df.withColumn("sorted_list", F.collect_set("action").over(Window.partitionBy("id").orderBy("date")))
test = df_2.select('id', 'country', 'sorted_list')\
.dropDuplicates()\
.select('country', 'sorted_list')\
.groupBy('site_name', 'sorted_list')\
.agg(F.count('sorted_list'))