Список неупорядочен даже после использования управления окнами с collect_set - PullRequest
0 голосов
/ 16 марта 2020

Итак, я пытаюсь собрать группу дат из фрейма данных. Проблема, с которой я сталкиваюсь, заключается в том, что даты не присутствуют в порядке, в котором находится фрейм данных.

Example dataframe (this is a much larger dataset. Basically this dataframe tracks the 
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt|    cal_dt|
+--------+-----------+----------+
|    2013| 2012-12-31|2012-12-31|
|    2013| 2012-12-31|2013-01-03|
|    2013| 2013-01-07|2013-01-07|
|    2013| 2013-01-07|2013-01-12|
|    2013| 2013-01-14|2013-01-14|
|    2013| 2013-01-14|2013-01-15|
|    2014| 2014-01-01|2014-01-01|
|    2014| 2014-01-01|2014-01-05|
|    2014| 2014-01-07|2014-01-07|
|    2014| 2014-01-07|2014-01-12|
|    2014| 2014-01-15|2014-01-15|
|    2014| 2014-01-15|2014-01-16|

What Im trying to get to is this
+--------+-------------------------------------+
|year_num|   dates.                            |
+--------+-------------------------------------+
|    2013|[2012-12-31, 2013-01-07, 2013-01-14] |
|    2014|[2014-01-01, 2014-01-07, 2014-01-14] |

Я попытался сделать это с помощью окон, так как collect_set вместе с groupBy приведет к неупорядоченному набору:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('year_num').orderBy('week_beg_dt')

business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
                    .groupBy('year_num') \
                    .agg(F.max('dates').alias('dates')) \
                    .collect()

Но я все равно получаю неупорядоченные множества. Есть предложения, что я делаю не так и как это исправить?

1 Ответ

2 голосов
/ 16 марта 2020

Для Spark 2.4 + , используйте array_sort во встроенной функции на collect_set, чтобы получить упорядоченный список.

Example:

df1.show()

#+--------+-----------+----------+
#|year_num|week_beg_dt|    cal_dt|
#+--------+-----------+----------+
#|    2013| 2012-12-31|2012-12-31|
#|    2013| 2012-12-31|2012-12-31|
#|    2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+

#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013    |[2013-01-07, 2012-12-31]|
#+--------+------------------------+

#using array_sort

df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013    |[2012-12-31, 2013-01-07]            |
#+--------+------------------------------------+

Для более ранних версий Spark:

from pyspark.sql.types import *

#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))

df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)

#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013    |[2012-12-31, 2013-01-07]                |
#+--------+----------------------------------------+
...