PySpark - Как отфильтровать последовательный кусок строк из фрейма данных на основе двух значений в столбце - PullRequest
0 голосов
/ 01 февраля 2019

У меня есть один фрейм данных, и я хочу создать другой фрейм данных на основе значений некоторых столбцов, используя pyspark.Например: ниже мой основной фрейм данных -

Part1   Part2   Part3   Part4
aaa      up      24     k-123
bbb     down     45     i-98
ccc     down     54     k-89
fff     int      23     l-34
xyz      up      22     o-89
www      up      89     u-56

Теперь я хочу создать еще один фрейм данных, который будет искать 1-е вхождение «вниз» и идти до 1-го вхождения «вверх».Итак, ожидаемый фрейм данных будет:

   Part1    Part2   Part3   Part4
    bbb     down     45     i-98
    ccc     down     54     k-89
    fff     int      23     l-34
    xyz      up      22     o-89

1 Ответ

0 голосов
/ 02 февраля 2019

Шаг 1: Создание DataFrame.

from pyspark.sql.functions import when, col, lit
df = spark.createDataFrame(
    [('aaa','up',24,'k-123'),('bbb','down',45,'i-98'),('ccc','down',54,'k-89'),
     ('fff','int', 23,'l-34'),('xyz','up',22,'o-89'),('www','up',89,'u-56')], 
    schema = ['Part1','Part2','Part3','Part4']
)
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
|  aaa|   up|   24|k-123|
|  bbb| down|   45| i-98|
|  ccc| down|   54| k-89|
|  fff|  int|   23| l-34|
|  xyz|   up|   22| o-89|
|  www|   up|   89| u-56|
+-----+-----+-----+-----+

Шаг 2: Сначала нам нужно найти первое вхождение down и удалитьвсе ряды над ним.Для этого мы создаем столбец cumulative, со значением 1, если Part2 == down и 0 в противном случае, и, наконец, берём кумулятивную сумму этого столбца.

df = df.withColumn('Dummy',lit('dummy'))
df = df.withColumn('cumulative',when(col('Part2')=='down',1).otherwise(0))
df = df.selectExpr(
    'Part1','Part2','Part3','Part4','Dummy',
    'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
 )
df.show()
+-----+-----+-----+-----+-----+----------+
|Part1|Part2|Part3|Part4|Dummy|cumulative|
+-----+-----+-----+-----+-----+----------+
|  aaa|   up|   24|k-123|dummy|         0|
|  bbb| down|   45| i-98|dummy|         1|
|  ccc| down|   54| k-89|dummy|         2|
|  fff|  int|   23| l-34|dummy|         2|
|  xyz|   up|   22| o-89|dummy|         2|
|  www|   up|   89| u-56|dummy|         2|
+-----+-----+-----+-----+-----+----------+

Теперь удалите все строкигде накопленная сумма равна 0. Это удалит все строки до тех пор, пока down не появится в первый раз.

df = df.where(col('cumulative')>=1)

Шаг 3: Сделайте то же самое, что и в шаге 2 выше, за исключением того, чтосделайте это для up и удалите все строки, где значение в столбце cumulative меньше или равно 1. Таким образом, мы удалим все строки ниже первого вхождения up.

df = df.withColumn('cumulative',when(col('Part2')=='up',1).otherwise(0))
df = df.selectExpr(
    'Part1','Part2','Part3','Part4','Dummy',
    'sum(cumulative) over (order by row_number() over (order by Dummy)) as cumulative'
 )
df = df.where(col('cumulative')<=1).drop('Dummy','cumulative')
df.show()
+-----+-----+-----+-----+
|Part1|Part2|Part3|Part4|
+-----+-----+-----+-----+
|  bbb| down|   45| i-98|
|  ccc| down|   54| k-89|
|  fff|  int|   23| l-34|
|  xyz|   up|   22| o-89|
+-----+-----+-----+-----+
...