Оконная функция и условные фильтры в PySpark - PullRequest
0 голосов
/ 21 ноября 2019

Есть ли способ условно применить фильтр к оконной функции в pyspark? Для каждой группы в col1 я хочу сохранить только те строки, которые имеют X в col2. Если в группе нет X в col2, я хочу сохранить все строки в этой группе.

+------+------+
| col1 | col2 |
+------+------+
| A    |      |
+------+------+
| A    | X    |
+------+------+
| A    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+
| B    |      |
+------+------+

Ответы [ 2 ]

2 голосов
/ 21 ноября 2019

Вы можете сделать это с помощью оконной функции max, чтобы обозначить группу (разделенную на col1), которая имеет 'X' в col2 с идентификатором (в данном случае 1). Группам, у которых нет 'X', присваивается null. После этого просто отфильтруйте промежуточный кадр данных, чтобы получить желаемый результат.

from pyspark.sql import Window
from pyspark.sql.functions import max,when
w = Window.partitionBy(df.col1)
df_1 = df.withColumn('x_exists',max(when(df.col2 == 'X',1)).over(w))
df_2 = df_1.filter(((df_1.x_exists == 1) & (df_1.col2 == 'X')) | df_1.x_exists.isNull())
df_2.show()
1 голос
/ 21 ноября 2019

Альтернативно использование collect_list с большим синтаксисом SQL: NULL значение пропускается для collect_list, мы используем if(col2='X',1,NULL) в качестве элемента списка, так что, когда в столбце col2 отсутствует 'X', размер этого collect_list равен ZERO:

from pyspark.sql.functions import expr                                                                              

df_new = df.withColumn('has_X', expr("size(collect_list(if(col2='X',1,NULL)) OVER (partition by col1))>0")) \
           .filter("col2 = 'X' OR !has_X")
...