В идеале вам необходимо:
- Создать окно, разделенное на
id
и упорядоченное таким же образом, как у кадра данных - Сохранять только те строки, для которых есть«один» перед ними в окне
AFAIK, в окнах Spark нет функции поиска в окнах.Тем не менее, вы можете следовать этой идее и решить что-тоДавайте сначала создадим данные и импортируем функции и окна.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
l = [(1, 0), (1, 1), (1, 0), (2, 1), (2, 0), (3, 0), (3, 0), (3, 1)]
df = spark.createDataFrame(l, ['id', 'value'])
Затем добавим индекс на фрейм данных (это бесплатно), чтобы можно было упорядочивать окна.
indexedDf = df.withColumn("index", F.monotonically_increasing_id())
Затем мы создаем окно, которое просматривает только значения перед текущей строкой, упорядоченные по этому индексу и разделенные по id.
w = Window.partitionBy("id").orderBy("index").rowsBetween(Window.unboundedPreceding, 0)
Наконец, мы используем это окно для сбора набора предшествующих значений каждой строки.и отфильтруйте те, которые не содержат 1
.При желании мы можем вернуть заказ на index
, потому что управление окнами не сохраняет порядок на столбце id
.
indexedDf\
.withColumn('set', F.collect_set(F.col('value')).over(w))\
.where(F.array_contains(F.col('set'), 1))\
.orderBy("index")\
.select("id", "value").show()
+---+-----+
| id|value|
+---+-----+
| 1| 1|
| 1| 0|
| 2| 1|
| 2| 0|
| 3| 1|
+---+-----+