Я придумала решение, но оно кажется излишне уродливым, приветствовало бы что-нибудь лучшее!
data = spark.sparkContext.parallelize([
('a',100,1),
('a',200,2),
('a',300,3),
('a',400,5),
('a',500,6),
('a',600,7)])
df = spark.createDataFrame(data, ['id','value','period'])
window = Window.partitionBy('id').orderBy('period')
# look 1, 2, 3 and 4 rows behind:
for diff in [1,2,3,4]:
df = df.withColumn('{}_diff'.format(diff),
df['period'] - F.lag(df['period'], count=diff).over(window))
# if any of these are 4, that's the lag we need
# if not, there is no 4 period lagged return, so return None
#initialise col
df = df.withColumn('4_lag_value', F.lit(None))
# loop:
for diff in [1,2,3,4]:
df = df.withColumn('4_lag_value',
F.when(df['{}_diff'.format(diff)] == 4,
F.lag(df['value'], count=diff).over(window))
.otherwise(df['4_lag_value']))
# drop working cols
df = df.drop(*['{}_diff'.format(diff) for diff in [1,2,3,4]])
Это возвращает желаемый результат.