Ошибка Pyspark, когда UDF определен вне функции, которая вызывает его: Метод __getnewargs __ ([]) не существует - PullRequest
0 голосов
/ 05 декабря 2018

Я видел несколько вопросов по этому поводу, но, похоже, я не понимаю, почему я получаю эту ошибку, когда мой UDF определен вне функции, которую я вызываю на моем фрейме данных.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from data.utils import PropertiesGetter

glueContext = GlueContext(SparkContext.getOrCreate())

input_source = glueContext.create_dynamic_frame.from_catalog(database = "db_name", table_name = "input")
input_source_df = input_source.toDF()

test_df = PropertiesGetter(glueContext).add_subscription_properties(input_df)

Вызов add_subscription_properties из PropertiesGetter для моего input_df НЕ выдает ошибку, когда мой класс выглядит следующим образом (обратите внимание на вложенный UDF):

class PropertiesGetter(object):
    def __init__(self, gc):
        ...

    def add_subscription_properties(self, input_df):
        def _add_subscription_properties(self, subscription_name):
            subscription_mapping = {...}
            return subscription_mapping[subscription_name]
        udf_add_subscription_properties = udf(_add_subscription_properties, StringType())
        return input_df.withColumn("subscription_properties", 
                                   udf_add_subscription_properties("subscription_type"))

    ...

Но он выдает ошибку(в частности, Could not serialize object: Py4JError: An error occurred while calling o116.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist..), когда это выглядит так:

class PropertiesGetter(object):
    def __init__(self, gc):
        ...

    def _add_subscription_properties(self, subscription_name):
        subscription_mapping = {...}
        return subscription_mapping[subscription_name]

    def add_subscription_properties(self, input_df):
        udf_add_subscription_properties = udf(self._add_subscription_properties, StringType())
        return input_df.withColumn("subscription_properties", 
                                   udf_add_subscription_properties("subscription_type"))

    ...

Может кто-нибудь объяснить мне, почему это так?Я изо всех сил пытаюсь понять, почему это имеет значение.У меня есть несколько UDF, которые я использую в этом классе, поэтому я хочу знать, как можно обойти некоторые из этих UDF.

PS Я знаю, вам не нужна UDF для создания столбца, который применяет сопоставление, но вы просто хотели продемонстрировать это на простом примере.

...