Вы проверяли только столбец test, но забыли проверить столбец cnty.Для этого вам понадобится * другой WindowSpec для столбца cnty и объедините оба windowSpecs, используя встроенную функцию when
, чтобы получить желаемый результат
from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')
from pyspark.sql import functions as f
df = df.select(f.col('tests'), f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val'))).alias('val'), f.col('asd'), f.col('cnty'))
df.show(truncate=False)
, который должен дать вам
+-----+---+---+----+
|tests|val|asd|cnty|
+-----+---+---+----+
|test4|Y |5 |null|
|test3|N |4 |AUS |
|test1|Y |1 |null|
|test1|Y |2 |null|
|test1|Y |3 |null|
|test2|Y |2 |UK |
|null |Y |1 |UK |
+-----+---+---+----+
Я надеюсьэто объясняет, почему вы не получаете желаемый результат.
Обновление
Приведенное выше решение требует одновременного запуска обеих функций window
, что может привести к некоторымпроблемы с памятью.Вы можете запустить одну функцию window
для проверки столбцов tests
и cnty
по отдельности, так как
from pyspark.sql import window as w
windowSpec1 = w.Window.partitionBy('tests').orderBy('asd')
windowSpec2 = w.Window.partitionBy('cnty').orderBy('asd')
from pyspark.sql import functions as f
df = df.withColumn('val', f.when(f.max('val').over(windowSpec1)== 'Y', 'Y').otherwise(f.col('val')))\
.withColumn('val', f.when(f.max('val').over(windowSpec2)== 'Y', 'Y').otherwise(f.col('val')))
. Это приведет к тому же результату.