PySpark не будет принимать список по состоянию where/filter
. Он принимает либо string
, либо condition
.
То, как вы пытались, не сработало, вам нужно настроить некоторые вещи, чтобы поработать над ним. Ниже приведены 2 подхода для этого -
data = [(("ID1", 3, None)), (("ID2", 4, 12)), (("ID3", None, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()
from pyspark.sql import functions as F
путь - 1
#below change df_name if you have any other name
df_name = "df"
my_condition_list = ["%s['%s'].isNotNull()"%(df_name, c) for c in df.columns]
print (my_condition_list[0])
"df['ID'].isNotNull()"
print (" & ".join(my_condition_list))
"df['ID'].isNotNull() & df['colA'].isNotNull() & df['colB'].isNotNull()"
print (eval(" & ".join(my_condition_list)))
Column<b'(((ID IS NOT NULL) AND (colA IS NOT NULL)) AND (colB IS NOT NULL))'>
df.filter(eval(" & ".join(my_condition_list))).show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID2| 4| 12|
+---+----+----+
df.filter(eval(" | ".join(my_condition_list))).show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1| 3|null|
|ID2| 4| 12|
|ID3|null| 3|
+---+----+----+
путь - 2
my_condition_list = ["%s is not null"%c for c in df.columns]
print (my_condition_list[0])
'ID is not null'
print (" and ".join(my_condition_list))
'ID is not null and colA is not null and colB is not null'
df.filter(" and ".join(my_condition_list)).show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID2| 4| 12|
+---+----+----+
df.filter(" or ".join(my_condition_list)).show()
+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1| 3|null|
|ID2| 4| 12|
|ID3|null| 3|
+---+----+----+
Предпочтительный способ - путь-2