PySpark - Как установить значение по умолчанию для pyspark.sql.functions.lag в значение в текущей строке? - PullRequest
0 голосов
/ 20 сентября 2018

Как установить значение по умолчанию для pyspark.sql.functions.lag на значение в текущей строке?

Например, задано:

testInput = [(1, 'a'),(2, 'c'),(3, 'e'),(1, 'a'),(1, 'b'),(1, 'b')]
columns = ['Col A', 'Col B']
df = sc.parallelize(testInput).toDF(columns)
df.show()

windowSpecification = Window.partitionBy(col('Col A')).orderBy(col('Col B'))
changedRows = col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)

df.select(col('Col A'), col('Col B'), changedRows.alias('New Col C')).show()

, которое выдает:

+-----+-----+
|Col A|Col B|
+-----+-----+
|    1|    a|
|    2|    c|
|    3|    e|
|    1|    a|
|    1|    b|
|    1|    b|
+-----+-----+

+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|     null|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|     null|
|    2|    c|     null|
+-----+-----+---------+

Я бы хотел, чтобы результат выглядел следующим образом:

+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|    false|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|    false|
|    2|    c|    false|
+-----+-----+---------+

Мой текущий способ - добавить второй lag вызов к changedRows, например:

changedRows = (col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)) & F.lag(col('Col B'), 1).over(windowSpecification).isNotNull()

но мне это не кажется чистым.

Я бы хотел сделать что-то вроде

changedRows = col('Col B') != F.lag(col('Col B'), 1, col('Col B')).over(windowSpecification)

, но я получаю ошибку TypeError: 'Column' object is not callable.

1 Ответ

0 голосов
/ 20 сентября 2018

Вы можете использовать значения столбца в качестве параметров , если вы используете pyspark.sql.functions.expr.В вашем случае внесите следующие изменения в changedRows:

changedRows = F.expr(
    "`Col B` != lag(`Col B`, 1, `Col B`) over (PARTITION BY `Col A` ORDER BY `Col B`)"
)
df.select('Col A', 'Col B', changedRows.alias('New Col C')).show()
#+-----+-----+---------+
#|Col A|Col B|New Col C|
#+-----+-----+---------+
#|    1|    a|    false|
#|    1|    a|    false|
#|    1|    b|     true|
#|    1|    b|    false|
#|    3|    e|    false|
#|    2|    c|    false|
#+-----+-----+---------+

Вы должны ссылаться на имена столбцов в обратных тиках из-за пробела.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...