Pyspark - функция UDF сразу после создания столбца - PullRequest
0 голосов
/ 15 октября 2019

Я пытаюсь применить функцию UDF сразу после создания столбца.

Но у меня возникла проблема:

Cannot resolve column name "previous_status" among

Что означает, чтостолбец не существует.

Возможно, я мог бы изменить функцию UDF, чтобы она больше не была UDF, а была просто нормальной функцией с F.when & otherwise. Дело в том, что мне нужен глобальный диктат, как вы можете видеть, чтобы определить, видел ли я уже этот идентификатор или нет.

alreadyAuthorized = {}

def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
    if id in alreadyAuthorized:
        previously_authorized = 1
    else:
        previously_authorized = 0

    if not failed:
        alreadyAuthorized[id] = True

    return previously_authorized

previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())

def get_previous_status(data):
    partition = Window.partitionBy("id").orderBy("date")

    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))

data = get_previous_status(data)

1 Ответ

1 голос
/ 15 октября 2019

Попробуйте использовать функцию col для получения столбца, потому что, как указывало @LaSul, вы используете data до того, как он был назначен:

from pyspark.sql.function import col

...
    data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
                .withColumn("previously_authorized", previously_authorized_udf(col("id"), col("previous_status")))

...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...