Вы можете собрать свое окно в список, чтобы затем найти два его величайших элемента.
Давайте начнем с примера фрейма данных:
import pyspark.sql.functions as psf
from pyspark.sql import Window
import pandas as pd
import datetime as dt
from random import randint
df = spark.createDataFrame(pd.DataFrame(
[[i % 3, i % 5, dt.date(2010, 1, (i) % 31 + 1), randint(0, 10)] for i in range(100)],
columns=['k1', 'k2', 'time', 'xcol'])) \
.orderBy('k1', 'k2', 'time')
df.show()
Мы будем использовать ту же спецификацию окна, что и определенную вами:
w = Window.partitionBy("k1","k2").orderBy("time").rowsBetween(-5, 0)
df \
.withColumn("sequence", psf.sort_array(psf.collect_list(psf.col('xcol')).over(w), asc=False)) \
.select('*', psf.col("sequence")[0].alias('max1'), psf.col("sequence")[1].alias('max2')) \
.show()
+---+---+----------+----+------------------+----+----+
| k1| k2| time|xcol| sequence|max1|max2|
+---+---+----------+----+------------------+----+----+
| 1| 3|2010-01-12| 3| [3]| 3|null|
| 1| 3|2010-01-13| 3| [3, 3]| 3| 3|
| 1| 3|2010-01-14| 9| [9, 3, 3]| 9| 3|
| 1| 3|2010-01-27| 7| [9, 7, 3, 3]| 9| 7|
| 1| 3|2010-01-28| 2| [9, 7, 3, 3, 2]| 9| 7|
| 1| 3|2010-01-29| 0|[9, 7, 3, 3, 2, 0]| 9| 7|
| 1| 0|2010-01-09| 6| [6]| 6|null|
| 1| 0|2010-01-10| 4| [6, 4]| 6| 4|
| 1| 0|2010-01-11| 2| [6, 4, 2]| 6| 4|
| 1| 0|2010-01-24| 0| [6, 4, 2, 0]| 6| 4|
| 1| 0|2010-01-25| 6| [6, 6, 4, 2, 0]| 6| 6|
| 1| 0|2010-01-26| 2|[6, 6, 4, 2, 2, 0]| 6| 6|
| 1| 1|2010-01-01| 9| [9]| 9|null|
| 1| 1|2010-01-02| 8| [9, 8]| 9| 8|
| 1| 1|2010-01-15| 2| [9, 8, 2]| 9| 8|
| 1| 1|2010-01-16| 7| [9, 8, 7, 2]| 9| 8|
| 1| 1|2010-01-17| 9| [9, 9, 8, 7, 2]| 9| 9|
| 1| 1|2010-01-30| 9|[9, 9, 9, 8, 7, 2]| 9| 9|
| 1| 1|2010-01-31| 6|[9, 9, 8, 7, 6, 2]| 9| 9|
| 0| 1|2010-01-04| 1| [1]| 1|null|
+---+---+----------+----+------------------+----+----+
Вы можете использовать collect_set
если вы хотите разные значения