Как установить значение по умолчанию для pyspark.sql.functions.lag
на значение в текущей строке?
Например, задано:
testInput = [(1, 'a'),(2, 'c'),(3, 'e'),(1, 'a'),(1, 'b'),(1, 'b')]
columns = ['Col A', 'Col B']
df = sc.parallelize(testInput).toDF(columns)
df.show()
windowSpecification = Window.partitionBy(col('Col A')).orderBy(col('Col B'))
changedRows = col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)
df.select(col('Col A'), col('Col B'), changedRows.alias('New Col C')).show()
, которое выдает:
+-----+-----+
|Col A|Col B|
+-----+-----+
| 1| a|
| 2| c|
| 3| e|
| 1| a|
| 1| b|
| 1| b|
+-----+-----+
+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
| 1| a| null|
| 1| a| false|
| 1| b| true|
| 1| b| false|
| 3| e| null|
| 2| c| null|
+-----+-----+---------+
Я бы хотел, чтобы результат выглядел следующим образом:
+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
| 1| a| false|
| 1| a| false|
| 1| b| true|
| 1| b| false|
| 3| e| false|
| 2| c| false|
+-----+-----+---------+
Мой текущий способ - добавить второй lag
вызов к changedRows
, например:
changedRows = (col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)) & F.lag(col('Col B'), 1).over(windowSpecification).isNotNull()
но мне это не кажется чистым.
Я бы хотел сделать что-то вроде
changedRows = col('Col B') != F.lag(col('Col B'), 1, col('Col B')).over(windowSpecification)
, но я получаю ошибку TypeError: 'Column' object is not callable
.