udf
не знает, что такое имена столбцов.Таким образом, он проверяет каждое из ваших условий в вашем блоке if
/ elif
, и все они оцениваются как False
.Таким образом, функция вернет None
.
. Вам нужно будет переписать свой udf
, чтобы взять столбцы, которые вы хотите проверить:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def get_profile(foo, bar, baz):
if foo == 1:
return 'Foo'
elif bar == 1:
return 'Bar'
elif baz == 1 :
return 'Baz'
spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
Если у вас много столбцов и вы хотите передать их все (по порядку):
spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))
В общем случае вы можете распаковать любой упорядоченный список столбцов:
cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))
Но этоДля конкретной операции не требуется udf
.Я бы сделал это следующим образом:
from pyspark.sql.functions import coalesce, when, col, lit
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
Это работает, потому что pyspark.sql.functions.when()
вернет null
по умолчанию, если условие оценивается как False
, а otherwise
не указано.Тогда понимание списка pyspark.sql.functions.coalesce
вернет первый ненулевой столбец.
Обратите внимание, что это эквивалентно ТОЛЬКО udf
, если порядок столбцов такой же, как последовательность, вычисленная в get_profile
функция.Чтобы быть более точным, вы должны сделать:
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()