Вы можете сделать это без udf
. Вот один из способов переписать ваш код и получить специальную корзину для значений null
:
from pyspark.sql.functions import col, when
def a(b):
return when(b.isNull(), "Other")\
.when(b <= 20, "<= 20")\
.when(b <= 40, "20 < <= 40")\
.when(b <= 45, "40 < <= 45")\
.otherwise("> 45")
data = data.withColumn("a_bucket", a(col("b")))
Однако более общее решение позволит вам передать список блоков и динамически вернуть вывод корзины (не проверено):
from functools import reduce
def aNew(b, buckets):
"""assumes buckets are sorted"""
if not buckets:
raise ValueError("buckets can not be empty")
return reduce(
lambda w, i: w.when(
b.between(buckets[i-1], buckets[i]),
"{low} < <= {high}".format(low=buckets[i-1], high=buckets[i]))
),
range(1, len(buckets)),
when(
b.isNull(),
"Other"
).when(
b <= buckets[0],
"<= {first}".format(first=buckets[0])
)
).otherwise("> {last}".format(last=buckets[-1]))
data = data.withColumn("a_bucket", aNew(col("b"), buckets=[20, 40, 45]))