Столбец Pyspark UDF на Датафрейме - PullRequest
0 голосов
/ 26 сентября 2018

Я пытаюсь создать новый столбец в кадре данных на основе значений некоторых столбцов.Это возвращает ноль во всех случаях.Кто-нибудь знает, что не так с этим простым примером?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None

Я ожидаю, что столбец get_profile будет заполнен для всех строк.

Я также пробовал это:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())

с тем же эффектом.

1 Ответ

0 голосов
/ 26 сентября 2018

udf не знает, что такое имена столбцов.Таким образом, он проверяет каждое из ваших условий в вашем блоке if / elif, и все они оцениваются как False.Таким образом, функция вернет None.

. Вам нужно будет переписать свой udf, чтобы взять столбцы, которые вы хотите проверить:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def get_profile(foo, bar, baz):
    if foo == 1:
        return 'Foo'
    elif bar == 1:
        return 'Bar'
    elif baz == 1 :
        return 'Baz'

spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

Если у вас много столбцов и вы хотите передать их все (по порядку):

spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))

В общем случае вы можете распаковать любой упорядоченный список столбцов:

cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))

Но этоДля конкретной операции не требуется udf.Я бы сделал это следующим образом:

from pyspark.sql.functions import coalesce, when, col, lit

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

Это работает, потому что pyspark.sql.functions.when() вернет null по умолчанию, если условие оценивается как False, а otherwise не указано.Тогда понимание списка pyspark.sql.functions.coalesce вернет первый ненулевой столбец.

Обратите внимание, что это эквивалентно ТОЛЬКО udf, если порядок столбцов такой же, как последовательность, вычисленная в get_profile функция.Чтобы быть более точным, вы должны сделать:

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...