Я бы не знал, есть ли лучший способ, но вам нужно где-то создавать записи.Лаг не делает этого.Итак, сначала вам нужно сгенерировать новые записи на основе текущих.Тогда вы могли бы использовать функцию задержки.
Может быть что-то вроде этого:
data
// convert the string to an actual date
.withColumn("yearmonth", to_date('yyyyMM, "yyyyMM"))
// for each record create 2 additional in the future (with 0 amount)
.select(
explode(array(
// org record
struct('id, date_format('yearmonth, "yyyyMM").as("yearmonth"), 'amount),
// 1 month in future
struct('id, date_format(add_months('yearmonth, 1), "yyyyMM").as("yearmonth"), lit(0).as("amount")),
// 2 months in future
struct('id, date_format(add_months('yearmonth, 2), "yyyyMM").as("yearmonth"), lit(0).as("amount"))
)).as("record"))
// keep 1 record per month
.groupBy($"record.yearmonth")
.agg(
min($"record.id").as("id"),
sum($"record.amount").as("amount")
)
// final structure (with lag fields)
.select(
'id,
'yearmonth,
'amount,
lag('yearmonth, 1).over(orderByWindow).as("yearmonth-1"),
lag('amount, 1, 0).over(orderByWindow).as("amount2"),
lag('yearmonth, 2).over(orderByWindow).as("yearmonth-2"),
lag('amount, 2, 0).over(orderByWindow).as("amount3")
)
.orderBy('yearmonth.desc)
Это не идеально, но это начало
+---+---------+------+-----------+-------+-----------+-------+
|id |yearmonth|amount|yearmonth-1|amount2|yearmonth-2|amount3|
+---+---------+------+-----------+-------+-----------+-------+
|1 |201709 |0.0 |201708 |0.0 |201707 |10.0 |
|1 |201708 |0.0 |201707 |10.0 |201706 |5.0 |
|1 |201707 |10.0 |201706 |5.0 |201606 |0.0 |
|1 |201706 |5.0 |201606 |0.0 |201605 |0.0 |
|2 |201606 |0.0 |201605 |0.0 |201604 |12.0 |
|2 |201605 |0.0 |201604 |12.0 |201603 |0.0 |
|2 |201604 |12.0 |201603 |0.0 |201602 |0.0 |
|2 |201603 |0.0 |201602 |0.0 |201601 |15.0 |
|2 |201602 |0.0 |201601 |15.0 |null |0.0 |
|2 |201601 |15.0 |null |0.0 |null |0.0 |
+---+---------+------+-----------+-------+-----------+-------+