Вот метод с использованием оконных функций:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# set up DF
df = sc.parallelize([["line1"], ["line2"], ["line3..ERROR"], ["line4"], ["line5"]]).toDF(['col'])
# create an indicator that created a boundary between consecutive errors
win1 = Window.orderBy('col')
df = df.withColumn('hit_error', F.expr("case when col like '%ERROR%' then 1 else 0 end"))
df = df.withColumn('cum_error', F.sum('hit_error').over(win1))
# now count the lines between each error occurrence
win2 = Window.partitionBy('cum_error').orderBy('col')
df = df.withColumn('rownum', F.row_number().over(win2))
# the lines we want are rows 2,3
df.filter("cum_error>0 and rownum in (2,3)").select("col").show(10)```