Я украл данные из вашего предыдущего вопроса для этого, так как мне было лень делать это самостоятельно, и какой-то хороший парень создал список для ввода данных там.
Поскольку окно скользит по числу записей, скорее чем количество месяцев, я объединил все записи за данный месяц (сгруппированные по client_id
и value1
, конечно) в одну запись в .groupBy("client_id", "value1", "year_val", "month_val")
, которая присутствует в вычислениях для df2
from pyspark.sql import functions as F
from pyspark.sql.window import Window
data= [['dhd',589,'ecdu','2020-1-5'],
['dhd',575,'tygp','2020-1-5'],
['dhd',821,'rdsr','2020-1-5'],
['dhd',872,'rgvd','2019-12-1'],
['dhd',619,'bhnd','2019-12-15'],
['dhd',781,'prti','2019-12-18'],
['dhd',781,'prti1','2019-12-18'],
['dhd',781,'prti2','2019-11-18'],
['dhd',781,'prti3','2019-10-31'],
['dhd',781,'prti4','2019-09-30'],
['dhd',781,'prt1','2019-07-31'],
['dhd',781,'pr4','2019-06-30'],
['dhd',781,'pr2','2019-08-31'],
['dhd',781,'prt4','2019-01-31'],
['dhd',781,'prti6','2019-02-28'],
['dhd',781,'prti7','2019-02-02'],
['dhd',781,'prti8','2019-03-29'],
['dhd',781,'prti9','2019-04-29'],
['dhd',781,'prti10','2019-05-04'],
['dhd',781,'prti11','2019-03-01']]
columns= ['client_id','value1','name1','a_date']
df= spark.createDataFrame(data,columns)
df2 = df.withColumn("year_val", F.year("a_date"))\
.withColumn("month_val", F.month("a_date"))\
.groupBy("client_id", "value1", "year_val", "month_val")\
.agg(F.concat_ws(", ", F.collect_list("name1")).alias("init_list"))
df2.show()
Здесь мы получаем init_list
как:
+---------+------+--------+---------+-------------+
|client_id|value1|year_val|month_val| init_list|
+---------+------+--------+---------+-------------+
| dhd| 781| 2019| 12| prti, prti1|
| dhd| 589| 2020| 1| ecdu|
| dhd| 781| 2019| 8| pr2|
| dhd| 781| 2019| 3|prti8, prti11|
| dhd| 575| 2020| 1| tygp|
| dhd| 781| 2019| 5| prti10|
| dhd| 781| 2019| 9| prti4|
| dhd| 781| 2019| 11| prti2|
| dhd| 781| 2019| 10| prti3|
| dhd| 821| 2020| 1| rdsr|
| dhd| 781| 2019| 6| pr4|
| dhd| 619| 2019| 12| bhnd|
| dhd| 781| 2019| 7| prt1|
| dhd| 781| 2019| 4| prti9|
| dhd| 781| 2019| 1| prt4|
| dhd| 781| 2019| 2| prti6, prti7|
| dhd| 872| 2019| 12| rgvd|
+---------+------+--------+---------+-------------+
Используя это, мы можем получить конечный результат, просто запустив окно над записями:
month_range = 6
w = Window().partitionBy("client_id", "value1")\
.orderBy("month_val")\
.rangeBetween(-(month_range+1),0)
df3 = df2.withColumn("last_0_month", F.collect_list(F.col("init_list")).over(w))\
.orderBy("value1", "year_val", "month_val")
df3.show(100,False)
Что дает нас:
+---------+------+--------+---------+-------------+-------------------------------------------------------------------+
|client_id|value1|year_val|month_val|init_list |last_0_month |
+---------+------+--------+---------+-------------+-------------------------------------------------------------------+
|dhd |575 |2020 |1 |tygp |[tygp] |
|dhd |589 |2020 |1 |ecdu |[ecdu] |
|dhd |619 |2019 |12 |bhnd |[bhnd] |
|dhd |781 |2019 |1 |prt4 |[prt4] |
|dhd |781 |2019 |2 |prti6, prti7 |[prt4, prti6, prti7] |
|dhd |781 |2019 |3 |prti8, prti11|[prt4, prti6, prti7, prti8, prti11] |
|dhd |781 |2019 |4 |prti9 |[prt4, prti6, prti7, prti8, prti11, prti9] |
|dhd |781 |2019 |5 |prti10 |[prt4, prti6, prti7, prti8, prti11, prti9, prti10] |
|dhd |781 |2019 |6 |pr4 |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4] |
|dhd |781 |2019 |7 |prt1 |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1] |
|dhd |781 |2019 |8 |pr2 |[prt4, prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1, pr2] |
|dhd |781 |2019 |9 |prti4 |[prti6, prti7, prti8, prti11, prti9, prti10, pr4, prt1, pr2, prti4]|
|dhd |781 |2019 |10 |prti3 |[prti8, prti11, prti9, prti10, pr4, prt1, pr2, prti4, prti3] |
|dhd |781 |2019 |11 |prti2 |[prti9, prti10, pr4, prt1, pr2, prti4, prti3, prti2] |
|dhd |781 |2019 |12 |prti, prti1 |[prti10, pr4, prt1, pr2, prti4, prti3, prti2, prti, prti1] |
|dhd |821 |2020 |1 |rdsr |[rdsr] |
|dhd |872 |2019 |12 |rgvd |[rgvd] |
+---------+------+--------+---------+-------------+-------------------------------------------------------------------+
Ограничения:
К сожалению, из-за второй части поле a_date
теряется и для операций со скользящим окном с определенным для них диапазоном, orderBy
не может указывать несколько столбцов (обратите внимание, что orderBy
в определении окна только на month_val
). По этой причине это точное решение не будет работать для данных, охватывающих несколько лет. Однако это можно легко преодолеть, если в столбце, объединяющем значения года и месяца, использовать что-то вроде month_id, а затем использовать его в предложении orderBy
.
Если вы хотите иметь несколько windows, вы можно преобразовать month_range
в список и l oop поверх него в последнем фрагменте кода, чтобы охватить все диапазоны.
Хотя последний столбец (last_0_month
) выглядит как массив, он содержит строки, разделенные запятыми с предыдущей операции agg
. Вы можете также убрать это.