Как создать pyspark udf, вызывая функцию класса из другой функции класса в том же файле? - PullRequest
0 голосов
/ 22 марта 2020

Я создаю pyspark udf внутри представления на основе классов, и у меня есть функция, которую я хочу вызвать, внутри другого представления на основе классов, оба они находятся в одном файле (api.py), но когда я проверяю В результате содержимое информационного кадра, я получаю эту ошибку:

ModuleNotFoundError: No module named 'api'

Я не могу понять, почему это происходит, я попытался сделать подобный код в консоли pyspark, и он работал хорошо. Подобный вопрос был задан здесь , но разница в том, что я пытаюсь сделать это в том же файле.

Это часть моего полного кода: api.py

class TextMiningMethods():
    def clean_tweet(self,tweet):
        '''
        some logic here
        '''
        return "Hello: "+tweet


class BigDataViewSet(TextMiningMethods,viewsets.ViewSet):

    @action(methods=['post'], detail=False)
    def word_cloud(self, request, *args, **kwargs): 
        '''
        some previous logic here
        '''
        spark=SparkSession \
            .builder \
            .master("spark://"+SPARK_WORKERS) \
            .appName('word_cloud') \
            .config("spark.executor.memory", '2g') \
            .config('spark.executor.cores', '2') \
            .config('spark.cores.max', '2') \
            .config("spark.driver.memory",'2g') \
            .getOrCreate()

        sc.sparkContext.addPyFile('path/to/udfFile.py')
        cols = ['text']
        rows = []

        for tweet_account_index, tweet_account_data in enumerate(tweets_list):

            tweet_data_aux_pandas_df = pd.Series(tweet_account_data['tweet']).dropna()
            for tweet_index,tweet in enumerate(tweet_data_aux_pandas_df):
                row= [tweet['text']]
                rows.append(row)

        # Create a Pandas Dataframe of tweets
        tweet_pandas_df = pd.DataFrame(rows, columns = cols)

        schema = StructType([
            StructField("text", StringType(),True)
        ])

        # Converts to Spark DataFrame
        df = spark.createDataFrame(tweet_pandas_df,schema=schema)
        clean_tweet_udf = udf(TextMiningMethods().clean_tweet, StringType())
        clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["text"]))
        clean_tweet_df.show()   # This line produces the error

Этот похожий тест в pyspark работает хорошо

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf
def clean_tweet(name):
    return "This is " + name

schema = StructType([StructField("Id", IntegerType(),True),StructField("tweet", StringType(),True)])

data = [[ 1, "tweet 1"],[2,"tweet 2"],[3,"tweet 3"]]
df = spark.createDataFrame(data,schema=schema)

clean_tweet_udf = udf(clean_tweet,StringType())
clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["tweet"]))
clean_tweet_df.show()

Так вот мои вопросы:

  1. С чем связана эта ошибка? и как я могу это исправить?
  2. Как правильно создать pyspark udf при работе с представлением на основе классов? Это неправильная практика - писать функции, которые вы будете использовать как pyspark udf, в том же файле, где вы будете их вызывать? (в моем случае все мои конечные точки API, работающие с django остальные рамки)

Любая помощь будет оценена, заранее спасибо

ОБНОВЛЕНИЕ:

Эта ссылка и ссылка объясняют, как использовать пользовательские классы с pyspark с использованием SparkContext, но не с SparkSession, как в моем случае, но я использовал это:

sc.sparkContext.addPyFile('path/to/udfFile.py')

Проблема в том, что я определил класс, в котором у меня есть функции для использования в качестве pyspark udf, в том же файле, где я создаю функцию udf для фрейма данных (как показано в моем коде). Я не смог найти, как достичь этого поведения, когда путь addPyFile () находится в том же коде . Несмотря на это, я переместил свой код и выполнил эти шаги (это была еще одна ошибка, которую я исправил):

  • Создайте новую папку с именем udf
  • Создайте новый пустой __ini__.py файл, чтобы создать каталог для пакета.
  • И создайте файл file.py для моих функций udf.
core/
    udf/
    ├── __init__.py
    ├── __pycache__
    └── pyspark_udf.py
    api/
    ├── admin.py
    ├── api.py
    ├── apps.py
    ├── __init__.py

В этом файле я попытался импортировать зависимости либо в начале, либо внутри функции. Во всех случаях я получаю ModuleNotFoundError: No module named 'udf'

pyspark_udf.py

import re
import string
import unidecode
from nltk.corpus import stopwords

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):
        # some logic here

Я пытался со всеми из них, В начале моего api.py файла

from udf.pyspark_udf import TextMiningMethods

# or

from udf.pyspark_udf import *

А внутри функции word_cloud

class BigDataViewSet(viewsets.ViewSet):
    def word_cloud(self, request, *args, **kwargs):
        from udf.pyspark_udf import TextMiningMethods

В отладчике python эта строка работает:

from udf.pyspark_udf import TextMiningMethods

Но когда я показываю фрейм данных, я получаю ошибку:

clean_tweet_df.show()

ModuleNotFoundError: No module named 'udf'

Очевидно, исходная проблема изменилась на другую, теперь моя проблема больше связана с этим вопросом , но я пока не смог найти удовлетворительный способ импортировать файл и создать pyspark. udf вызывает функцию класса из другой функции класса.

Что мне не хватает?

1 Ответ

0 голосов
/ 02 апреля 2020

После разных попыток я не мог найти решение, ссылаясь на метод по пути addPyFile(), расположенный в том же файле, где я создавал udf (я хотел бы знать, если это плохая практика ) или в другом файле, технически addPyFile (путь) документация гласит:

Добавить зависимость .py или .zip для всех задач, которые должны быть выполнены на это SparkContext в будущем. Переданный путь может быть локальным файлом, файлом в HDFS (или другой файловой системе, поддерживающей Had oop) или URI HTTP, HTTPS или FTP.

Так что я должен упомянуть возможный. Исходя из этого, мне пришлось использовать это решение и сжать всю папку udf с самого высокого уровня с помощью:

zip -r udf.zip udf

Кроме того, в pyspark_udf.py Мне пришлось импортировать мои зависимости, как показано ниже, чтобы избежать этой проблемы

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):
        import re
        import string
        import unidecode
        from nltk.corpus import stopwords

Вместо:

import re
import string
import unidecode
from nltk.corpus import stopwords

class TextMiningMethods():
    """docstring for TextMiningMethods"""
    def clean_tweet(self,tweet):

Затем, наконец, эта строка работала хорошо:

clean_tweet_df.show()

Надеюсь, это может пригодиться кому-то еще

...