Pyspark: передать несколько столбцов вместе с аргументом в UDF - PullRequest
0 голосов
/ 16 октября 2018

Я пишу udf, который будет принимать два столбца данных вместе с дополнительным параметром (постоянное значение) и должен добавить новый столбец в кадр данных.Моя функция выглядит так:

def udf_test(column1, column2, constant_var):
    if column1 == column2:
        return column1
    else:
        return constant_var

также, я делаю следующее для передачи в несколько столбцов:

apply_test = udf(udf_test, StringType())
df = df.withColumn('new_column', apply_test('column1', 'column2'))

Это не работает сейчас, пока я не удалю constant_var какмои функции третий аргумент, но мне это действительно нужно.Поэтому я попытался сделать что-то вроде следующего:

constant_var = 'TEST'
apply_test = udf(lambda x: udf_test(x, constant_var), StringType())
df = df.withColumn('new_column', apply_test(constant_var)(col('column1', 'column2')))

и

apply_test = udf(lambda x,y: udf_test(x, y, constant_var), StringType())

Ничто из вышеперечисленного не помогло мне.Я получил эти идеи, основываясь на , и , сообщениях stackoverflow, и я думаю, что очевидно, чем мой вопрос отличается от обоих.Любая помощь будет принята с благодарностью.

ПРИМЕЧАНИЕ: Я упростил функцию здесь только для обсуждения, и фактическая функция более сложна.Я знаю, что эту операцию можно выполнить с помощью операторов when и otherwise.

1 Ответ

0 голосов
/ 16 октября 2018

Вам не нужно использовать пользовательскую функцию.Вы можете использовать функции когда () и в противном случае () :

from pyspark.sql import functions as f
df = df.withColumn('new_column', 
                   f.when(f.col('col1') == f.col('col2'), f.col('col1'))
                    .otherwise('other_value'))

Другой способ сделать это - сгенерировать пользовательскую функцию.Однако использование udf оказывает негативное влияние на производительность, поскольку данные должны быть (де) сериализованы в и из python.Чтобы сгенерировать пользовательскую функцию, вам нужна функция, которая возвращает (пользовательскую) функцию.Например:

def generate_udf(constant_var):
    def test(col1, col2):
        if col1 == col2:
            return col1
        else:
            return constant_var
    return f.udf(test, StringType())

df = df.withColumn('new_column', 
                   generate_udf('default_value')(f.col('col1'), f.col('col2')))
...