Я создаю pyspark udf внутри представления на основе классов, и у меня есть функция, которую я хочу вызвать, внутри другого представления на основе классов, оба они находятся в одном файле (api.py
), но когда я проверяю В результате содержимое информационного кадра, я получаю эту ошибку:
ModuleNotFoundError: No module named 'api'
Я не могу понять, почему это происходит, я попытался сделать подобный код в консоли pyspark, и он работал хорошо. Подобный вопрос был задан здесь , но разница в том, что я пытаюсь сделать это в том же файле.
Это часть моего полного кода: api.py
class TextMiningMethods():
def clean_tweet(self,tweet):
'''
some logic here
'''
return "Hello: "+tweet
class BigDataViewSet(TextMiningMethods,viewsets.ViewSet):
@action(methods=['post'], detail=False)
def word_cloud(self, request, *args, **kwargs):
'''
some previous logic here
'''
spark=SparkSession \
.builder \
.master("spark://"+SPARK_WORKERS) \
.appName('word_cloud') \
.config("spark.executor.memory", '2g') \
.config('spark.executor.cores', '2') \
.config('spark.cores.max', '2') \
.config("spark.driver.memory",'2g') \
.getOrCreate()
sc.sparkContext.addPyFile('path/to/udfFile.py')
cols = ['text']
rows = []
for tweet_account_index, tweet_account_data in enumerate(tweets_list):
tweet_data_aux_pandas_df = pd.Series(tweet_account_data['tweet']).dropna()
for tweet_index,tweet in enumerate(tweet_data_aux_pandas_df):
row= [tweet['text']]
rows.append(row)
# Create a Pandas Dataframe of tweets
tweet_pandas_df = pd.DataFrame(rows, columns = cols)
schema = StructType([
StructField("text", StringType(),True)
])
# Converts to Spark DataFrame
df = spark.createDataFrame(tweet_pandas_df,schema=schema)
clean_tweet_udf = udf(TextMiningMethods().clean_tweet, StringType())
clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["text"]))
clean_tweet_df.show() # This line produces the error
Этот похожий тест в pyspark работает хорошо
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import udf
def clean_tweet(name):
return "This is " + name
schema = StructType([StructField("Id", IntegerType(),True),StructField("tweet", StringType(),True)])
data = [[ 1, "tweet 1"],[2,"tweet 2"],[3,"tweet 3"]]
df = spark.createDataFrame(data,schema=schema)
clean_tweet_udf = udf(clean_tweet,StringType())
clean_tweet_df = df.withColumn("clean_tweet", clean_tweet_udf(df["tweet"]))
clean_tweet_df.show()
Так вот мои вопросы:
- С чем связана эта ошибка? и как я могу это исправить?
- Как правильно создать pyspark udf при работе с представлением на основе классов? Это неправильная практика - писать функции, которые вы будете использовать как pyspark udf, в том же файле, где вы будете их вызывать? (в моем случае все мои конечные точки API, работающие с django остальные рамки)
Любая помощь будет оценена, заранее спасибо
ОБНОВЛЕНИЕ:
Эта ссылка и ссылка объясняют, как использовать пользовательские классы с pyspark с использованием SparkContext, но не с SparkSession, как в моем случае, но я использовал это:
sc.sparkContext.addPyFile('path/to/udfFile.py')
Проблема в том, что я определил класс, в котором у меня есть функции для использования в качестве pyspark udf, в том же файле, где я создаю функцию udf для фрейма данных (как показано в моем коде). Я не смог найти, как достичь этого поведения, когда путь addPyFile () находится в том же коде . Несмотря на это, я переместил свой код и выполнил эти шаги (это была еще одна ошибка, которую я исправил):
- Создайте новую папку с именем
udf
- Создайте новый пустой
__ini__.py
файл, чтобы создать каталог для пакета. - И создайте файл file.py для моих функций udf.
core/
udf/
├── __init__.py
├── __pycache__
└── pyspark_udf.py
api/
├── admin.py
├── api.py
├── apps.py
├── __init__.py
В этом файле я попытался импортировать зависимости либо в начале, либо внутри функции. Во всех случаях я получаю ModuleNotFoundError: No module named 'udf'
pyspark_udf.py
import re
import string
import unidecode
from nltk.corpus import stopwords
class TextMiningMethods():
"""docstring for TextMiningMethods"""
def clean_tweet(self,tweet):
# some logic here
Я пытался со всеми из них, В начале моего api.py
файла
from udf.pyspark_udf import TextMiningMethods
# or
from udf.pyspark_udf import *
А внутри функции word_cloud
class BigDataViewSet(viewsets.ViewSet):
def word_cloud(self, request, *args, **kwargs):
from udf.pyspark_udf import TextMiningMethods
В отладчике python эта строка работает:
from udf.pyspark_udf import TextMiningMethods
Но когда я показываю фрейм данных, я получаю ошибку:
clean_tweet_df.show()
ModuleNotFoundError: No module named 'udf'
Очевидно, исходная проблема изменилась на другую, теперь моя проблема больше связана с этим вопросом , но я пока не смог найти удовлетворительный способ импортировать файл и создать pyspark. udf вызывает функцию класса из другой функции класса.
Что мне не хватает?