Решение PySpark
Образцы данных
df = spark.createDataFrame([(1,201901,10),
(1,201903,9),
(1,201904,21),
(1,201906,42),
(1,201909,3),
(1,201912,56)
],['id','weeknum','val'])
df.show()
+---+-------+---+
| id|weeknum|val|
+---+-------+---+
| 1| 201901| 10|
| 1| 201903| 9|
| 1| 201904| 21|
| 1| 201906| 42|
| 1| 201909| 3|
| 1| 201912| 56|
+---+-------+---+
1) Основная идея заключается в создании комбинации всех идентификаторов и недель (начиная с минимально возможного значения)до максимума) с cross join
.
from pyspark.sql.functions import min,max,sum,when
from pyspark.sql import Window
min_max_week = df.agg(min(df.weeknum),max(df.weeknum)).collect()
#Generate all weeks using range
all_weeks = spark.range(min_max_week[0][0],min_max_week[0][1]+1)
all_weeks = all_weeks.withColumnRenamed('id','weekno')
#all_weeks.show()
id_all_weeks = df.select(df.id).distinct().crossJoin(all_weeks).withColumnRenamed('id','aid')
#id_all_weeks.show()
2) После этого left join
исходный кадр данных для этих комбинаций помогает идентифицировать пропущенные значения.
res = id_all_weeks.join(df,(df.id == id_all_weeks.aid) & (df.weeknum == id_all_weeks.weekno),'left')
res.show()
+---+------+----+-------+----+
|aid|weekno| id|weeknum| val|
+---+------+----+-------+----+
| 1|201911|null| null|null|
| 1|201905|null| null|null|
| 1|201903| 1| 201903| 9|
| 1|201904| 1| 201904| 21|
| 1|201901| 1| 201901| 10|
| 1|201906| 1| 201906| 42|
| 1|201908|null| null|null|
| 1|201910|null| null|null|
| 1|201912| 1| 201912| 56|
| 1|201907|null| null|null|
| 1|201902|null| null|null|
| 1|201909| 1| 201909| 3|
+---+------+----+-------+----+
3) Затем используйте комбинацию оконных функций, sum
-> для назначения групп и max
-> для заполнения пропущенных значений после классификации групп.
w1 = Window.partitionBy(res.aid).orderBy(res.weekno)
groups = res.withColumn("grp",sum(when(res.id.isNull(),0).otherwise(1)).over(w1))
w2 = Window.partitionBy(groups.aid,groups.grp)
missing_values_filled = groups.withColumn('filled',max(groups.val).over(w2)) #select required columns as needed
missing_values_filled.show()
+---+------+----+-------+----+---+------+
|aid|weekno| id|weeknum| val|grp|filled|
+---+------+----+-------+----+---+------+
| 1|201901| 1| 201901| 10| 1| 10|
| 1|201902|null| null|null| 1| 10|
| 1|201903| 1| 201903| 9| 2| 9|
| 1|201904| 1| 201904| 21| 3| 21|
| 1|201905|null| null|null| 3| 21|
| 1|201906| 1| 201906| 42| 4| 42|
| 1|201907|null| null|null| 4| 42|
| 1|201908|null| null|null| 4| 42|
| 1|201909| 1| 201909| 3| 5| 3|
| 1|201910|null| null|null| 5| 3|
| 1|201911|null| null|null| 5| 3|
| 1|201912| 1| 201912| 56| 6| 56|
+---+------+----+-------+----+---+------+
Запрос улья с той же логикой, что и описанная выше (при условии, что может быть создана таблица со всеми неделями)
select id,weeknum,max(val) over(partition by id,grp) as val
from (select i.id
,w.weeknum
,t.val
,sum(case when t.id is null then 0 else 1 end) over(partition by i.id order by w.weeknum) as grp
from (select distinct id from tbl) i
cross join weeks_table w
left join tbl t on t.id = i.id and w.weeknum = t.weeknum
) t