Давайте создадим фиктивный DataFrame
. Цель состоит в том, чтобы иметь только DataFrame со столбцами B,C & D
.
from pyspark.sql.functions import count, when
valuesCol = [(None,21,12,None,1),(8,None,3,1,None),(None,40,None,10,None),(None,2,None,6,None),(None,8,0,2,None)]
df = sqlContext.createDataFrame(valuesCol,['A','B','C','D','E'])
df.show()
+----+----+----+----+----+
| A| B| C| D| E|
+----+----+----+----+----+
|null| 21| 12|null| 1|
| 8|null| 3| 1|null|
|null| 40|null| 10|null|
|null| 2|null| 6|null|
|null| 8| 0| 2|null|
+----+----+----+----+----+
Теперь давайте агрегируем DataFrame и collect()
его. Collect
- это action
, который возвращает все элементы DataFrame
в виде массива в программе драйвера.
aggregated_row = df.select([(count(when(col(c).isNull(), c))/df.count()).alias(c) for c in df.columns]).collect()
aggregated_row
[Row(A=0.8, B=0.2, C=0.4, D=0.2, E=0.8)]
Преобразование объекта Row()
в list
из dictionary
-
aggregated_dict_list = [row.asDict() for row in aggregated_row]
aggregated_dict = aggregated_dict_list[0]
{'D': 0.2, 'A': 0.8, 'C': 0.4, 'E': 0.8, 'B': 0.2}
Наконец, с помощью dictionary comprehensions
найдите те столбцы, в которых значения Null
составляют более 75% от общего числа, а затем отбросьте эти столбцы -
col_null_g_75p=list({i for i in aggregated_dict if aggregated_dict[i] > 0.75})
print(col_null_g_75p)
['A', 'E']
df = df.drop(*col_null_g_75p)
df.show()
+----+----+----+
| B| C| D|
+----+----+----+
| 21| 12|null|
|null| 3| 1|
| 40|null| 10|
| 2|null| 6|
| 8| 0| 2|
+----+----+----+