Вот решение, использующее PySpark Window, lead / udf.
Обратите внимание, что я изменил цены 5,6,7 ранга на 1,2,3, чтобы дифференцировать их с другими значениями для объяснения.что эта логика выбирает то, что вы объяснили.
max_price_udf = udf(lambda prices_list: max(prices_list), IntegerType())
df = spark.createDataFrame([(1, 5, 2019,1,20),(2, 4, 2019,2,18),
(3, 3, 2019,3,21),(4, 2, 2019,4,20),
(5, 1, 2019,5,1),(6, 52, 2018,6,2),
(7, 51, 2018,7,3)], ["product_id", "week", "year","rank","price"])
window = Window.orderBy(col("year").desc(),col("week").desc())
df = df.withColumn("prices_list", array([coalesce(lead(col("price"),x, None).over(window),lead(col("price"),x-3, None).over(window)) for x in range(1, 4)]))
df = df.withColumn("max_price",max_price_udf(col("prices_list")))
df.show()
, что приводит к
+----------+----+----+----+-----+------------+---------+
|product_id|week|year|rank|price| prices_list|max_price|
+----------+----+----+----+-----+------------+---------+
| 1| 5|2019| 1| 20|[18, 21, 20]| 21|
| 2| 4|2019| 2| 18| [21, 20, 1]| 21|
| 3| 3|2019| 3| 21| [20, 1, 2]| 20|
| 4| 2|2019| 4| 20| [1, 2, 3]| 3|
| 5| 1|2019| 5| 1| [2, 3, 1]| 3|
| 6| 52|2018| 6| 2| [3, 1, 2]| 3|
| 7| 51|2018| 7| 3| [1, 2, 3]| 3|
+----------+----+----+----+-----+------------+---------+
Вот решение в Scala
var df = Seq((1, 5, 2019, 1, 20), (2, 4, 2019, 2, 18),
(3, 3, 2019, 3, 21), (4, 2, 2019, 4, 20),
(5, 1, 2019, 5, 1), (6, 52, 2018, 6, 2),
(7, 51, 2018, 7, 3)).toDF("product_id", "week", "year", "rank", "price")
val window = Window.orderBy($"year".desc, $"week".desc)
df = df.withColumn("max_price", greatest((for (x <- 1 to 3) yield coalesce(lead(col("price"), x, null).over(window), lead(col("price"), x - 3, null).over(window))):_*))
df.show()