Эквивалент PySpark для лямбда-функции в UDF Pandas - PullRequest
1 голос
/ 19 сентября 2019

Я написал коды предварительной обработки данных в Pandas UDF в PySpark.Я использую лямбда-функцию, чтобы извлечь часть текста из всех записей столбца.

Вот как выглядит мой код:

@pandas_udf("string", PandasUDFType.SCALAR)
def get_X(col):
      return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x)

df = df.withColumn('X', get_first_name(df.Y))

Это работает нормально и дает желаемые результаты.Но мне нужно написать ту же часть логики в эквивалентном Spark-коде.Есть ли способ сделать это?Спасибо.

Ответы [ 3 ]

2 голосов
/ 19 сентября 2019

Я думаю, что для этой конкретной задачи достаточно одной функции substring_index :

from pyspark.sql.functions import substring_index

df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1'])

df2.withColumn('c2', substring_index('c1', ',', -1)).show()                                                                 
+------+---+
|    c1| c2|
+------+---+
|   f,l|  l|
|     g|  g|
|a,b,cd| cd|
+------+---+
0 голосов
/ 19 сентября 2019

С учетом следующего кадра данных df:

df.show()
# +-------------+
# |     BENF_NME|
# +-------------+
# |    Doe, John|
# |          Foo|
# |Baz, Quux,Bar|
# +-------------+

Вы можете просто использовать regexp_extract() для выбора имени:

from pyspark.sql.functions import regexp_extract
df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Если выне заботясь о возможных начальных пробелах, substring_index() предоставляет простую альтернативу вашей исходной логике:

from pyspark.sql.functions import substring_index
df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

В этом случае First_Name в первой строке имеет начальный пробел:

df.withColumn(...).collect()[0]
# Row(BENF_NME=u'Doe, John', First_Name=u' John'

Если вы все еще хотите использовать пользовательскую функцию, вам нужно создать пользовательскую функцию (UDF), используя udf():

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_first_name = udf(lambda s: s.split(',')[-1], StringType())
df.withColumn('First_Name', get_first_name(df.BENF_NME)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Обратите внимание, что пользовательские функции работают медленнее, чем встроенные функции Spark, особенно пользовательские функции Python.

0 голосов
/ 19 сентября 2019

Вы можете сделать то же самое, используя when для реализации логики if-then-else :

First split theстолбец, а затем вычислить его size.Если размер больше 0, берет последний элемент из массива разбиения .В противном случае верните исходный столбец.

from pyspark.sql.functions import split, size, when

def get_first_name(col):
    col_split = split(col, ',')
    split_size = size(col_split)
    return when(split_size > 0, col_split[split_size-1]).otherwise(col)

В качестве примера предположим, что у вас был следующий DataFrame:

df.show()
#+---------+
#| BENF_NME|
#+---------+
#|Doe, John|
#|  Madonna|
#+---------+

Вы можете вызвать новую функцию так же, как и раньше:

df = df.withColumn('First_Name', get_first_name(df.BENF_NME))
df.show()
#+---------+----------+
#| BENF_NME|First_Name|
#+---------+----------+
#|Doe, John|      John|
#|  Madonna|   Madonna|
#+---------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...