PySpark: заполнение столбца на основе последнего вхождения одного из значений в другом столбце - PullRequest
1 голос
/ 13 мая 2019

Используя PySpark, я ищу способ заполнения столбца Status на основе значений в столбце Code. Df упорядочен по столбцу ID.

Единственными значительными Code значениями являются A (Good), B (Bad), C (Neutral).

Когда появляется одно из этих значений, я бы хотел, чтобы каждая последующая строка имела одно и то же значение Status до тех пор, пока не появятся другие значимые значения Code.

Это желаемый вывод df с добавленным столбцом Status:

+----+------+---------+
| ID | Code | Status  |
+----+------+---------+
|  1 | A    | Good    |
|  2 | 1x4  | Good    |
|  3 | B    | Bad     |
|  4 | ytyt | Bad     |
|  5 | zix8 | Bad     |
|  6 | C    | Neutral |
|  7 | 44d  | Neutral |
|  8 | A    | Good    |
+----+------+---------+

Я не уверен, как решить эту проблему, я нашел этот вопрос, но я не знаю, можно ли адаптировать ответ к моим потребностям: PySpark Когда элемент в списке

Я думал об использовании функции задержки, но число строк между A, B and C строками нерегулярно, поэтому я не знаю, как это кусать.

Вот df для воспроизводимости:

df = sqlCtx.createDataFrame(
    [
        (1, A),
        (2, 1x4),
        (3, B),
        (4, ytyt),
        (5, zix8),
        (6, C),
        (7, 44d),
        (8, A)
    ],
    ('ID', 'Code')
)

Ответы [ 2 ]

2 голосов
/ 13 мая 2019

Сначала введите значимые значения кода, используя следующую функцию:

from pyspark.sql.functions col, lit, when

def getStatus(code):
    return when(code=="A", lit("Good"))\
        .when(code=="B", lit("Bad"))\
        .when(code=="C", lit("Neutral"))

df = df.withColumn("Status", getStatus(col("Code")))
df.show()
#+---+----+-------+
#| ID|Code| Status|
#+---+----+-------+
#|  1|   A|   Good|
#|  2| 1x4|   null|
#|  3|   B|    Bad|
#|  4|ytyt|   null|
#|  5|zix8|   null|
#|  6|   C|Neutral|
#|  7| 44d|   null|
#|  8|   A|   Good|
#+---+----+-------+

Затем используйте функцию Window, чтобы выбрать последнее ненулевое значение "Status", упорядоченное по "ID".Мы можем выбрать последнее значение, используя pyspark.sql.functions.last с ignorenulls=True.

from pyspark.sql.functions last
from pyspark.sql import Window

df = df.withColumn(
    "Status", 
    last(
        col("Status"),
        ignorenulls=True
    ).over(
        Window.partitionBy().orderBy("ID").rowsBetween(Window.unboundedPreceding, 0)
    )
)
df.show()
#+---+----+-------+
#| ID|Code| Status|
#+---+----+-------+
#|  1|   A|   Good|
#|  2| 1x4|   Good|
#|  3|   B|    Bad|
#|  4|ytyt|    Bad|
#|  5|zix8|    Bad|
#|  6|   C|Neutral|
#|  7| 44d|Neutral|
#|  8|   A|   Good|
#+---+----+-------+
1 голос
/ 13 мая 2019

Используйте when с запуском sum для определения групп (строки от первого вхождения кодов 'A', 'B' или 'C' до следующего в порядке id).Затем используйте значение first классифицированных групп в when, чтобы получить столбец состояния.

from pyspark.sql import Window
from pyspark.sql.functions import sum,when,first
w = Window.orderBy(df.id)
df_with_grp = df.withColumn('grp',sum(when(df.code.isin(['A','B','C']),1).otherwise(0)).over(w))
w1 = Window.partitionBy(df_with_grp.grp).orderBy(df_with_grp.id)
res = df_with_grp.withColumn('status',when(first(df_with_grp.code).over(w1) == 'A','Good') \
                                     .when(first(df_with_grp.code).over(w1) == 'B','Bad') \
                                     .when(first(df_with_grp.code).over(w1) == 'C','Neutral')
                            )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...