Шаг 1: Создание DataFrame
.
from pyspark.sql.functions import when, col, lit
df = spark.createDataFrame(
[('aaa','up',24,'k-123'),('bbb','down',45,'i-98'),('ccc','down',54,'k-89'),
('fff','int', 23,'l-34'),('xyz','up',22,'o-89'),('www','up',89,'u-56')],
schema = ['Part1','Part2','Part3','Part4']
)
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| aaa| up| 24|k-123|
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
| www| up| 89| u-56|
+-----+-----+-----+-----+
Шаг 2: Сначала нам нужно найти первое вхождение down
и удалитьвсе ряды над ним.Для этого мы создаем столбец cumulative
, со значением 1, если Part2
== down
и 0 в противном случае, и, наконец, берём кумулятивную сумму этого столбца.
df = df.withColumn('Dummy',lit('dummy'))
df = df.withColumn('cumulative',when(col('Part2')=='down',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df.show()
+-----+-----+-----+-----+-----+----------+
|Part1|Part2|Part3|Part4|Dummy|cumulative|
+-----+-----+-----+-----+-----+----------+
| aaa| up| 24|k-123|dummy| 0|
| bbb| down| 45| i-98|dummy| 1|
| ccc| down| 54| k-89|dummy| 2|
| fff| int| 23| l-34|dummy| 2|
| xyz| up| 22| o-89|dummy| 2|
| www| up| 89| u-56|dummy| 2|
+-----+-----+-----+-----+-----+----------+
Теперь удалите все строкигде накопленная сумма равна 0. Это удалит все строки до тех пор, пока down
не появится в первый раз.
df = df.where(col('cumulative')>=1)
Шаг 3: Сделайте то же самое, что и в шаге 2 выше, за исключением того, чтосделайте это для up
и удалите все строки, где значение в столбце cumulative
меньше или равно 1. Таким образом, мы удалим все строки ниже первого вхождения up
.
df = df.withColumn('cumulative',when(col('Part2')=='up',1).otherwise(0))
df = df.selectExpr(
'Part1','Part2','Part3','Part4','Dummy',
'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
)
df = df.where(col('cumulative')<=1).drop('Dummy','cumulative')
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
| bbb| down| 45| i-98|
| ccc| down| 54| k-89|
| fff| int| 23| l-34|
| xyz| up| 22| o-89|
+-----+-----+-----+-----+