Я пытаюсь применить функцию UDF сразу после создания столбца.
Но у меня возникла проблема:
Cannot resolve column name "previous_status" among
Что означает, чтостолбец не существует.
Возможно, я мог бы изменить функцию UDF, чтобы она больше не была UDF, а была просто нормальной функцией с F.when
& otherwise
. Дело в том, что мне нужен глобальный диктат, как вы можете видеть, чтобы определить, видел ли я уже этот идентификатор или нет.
alreadyAuthorized = {}
def previously_authorized_spark(id, failed, alreadyAuthorized = alreadyAuthorized):
if id in alreadyAuthorized:
previously_authorized = 1
else:
previously_authorized = 0
if not failed:
alreadyAuthorized[id] = True
return previously_authorized
previously_authorized_udf = udf(lambda x, y : previously_authorized_spark(x, y), IntegerType())
def get_previous_status(data):
partition = Window.partitionBy("id").orderBy("date")
data = data.withColumn("previous_status", F.lag(F.col("failed")).over(partition))\
.withColumn("previously_authorized", previously_authorized_udf(data["id"], data["previous_status"]))
data = get_previous_status(data)