У меня есть проект, в котором я интегрирую django rest framework (drf) и apache spark, поэтому у меня есть некоторые конечные точки, и мне нужно выполнить некоторые задания с помощью spark, используя udf. У меня есть следующая структура проекта:
core/
udf/
├── pyspark_udf.py
├── (and the rest of the files of a django app)
api/
├── api.py
├── urls.py
├── (and the rest of the files of a django app)
udf.zip
Итак, в соответствии с этой структурой:
- Во-первых, в файле
api.py
у меня есть конечная точка, где мне нужно выполнить job. - Во-вторых, внутри
pyspark_udf.py
я вызываю конечную точку в файле api.py
, делаю там логики c и возвращаю ответ. - Затем опять же в файле
api.py
я использую возвращаемое значение для создания нового столбца в фрейме данных pyspark. - Наконец, я возвращаю json ответ данных в конечной точке drf.
Я добился такого поведения, но используя requests
в функции pyspark_udf.py
, что я считаю, что Это не элегантно или не лучшая практика, работать с API, который находится в моем собственном проекте, вместо простого вызова класса конечной точки drf.
Я создал минимальный код ниже, чтобы воспроизвести ошибку, и если кто-то может найти решение, я буду признателен:
api/urls.py
from django.conf.urls import url
from rest_framework import routers
from .api import (ClassFooViewSet,ClassBarViewSet)
router = routers.DefaultRouter()
urlpatterns = [
url(r'^api/foo/endpointExampleA', ClassFooViewSet.as_view({'post':'endpointExampleA'}), name='endpointExampleA'),
url(r'^api/bar/endpointExampleB', ClassBarViewSet.as_view({'get':'endpointExampleB'}), name='endpointExampleB')
]
urlpatterns += router.urls
api/api.py
from rest_framework import viewsets
from rest_framework import views
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
import json
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from udf.pyspark_udf import UdfExample
class ClassFooViewSet(viewsets.ViewSet):
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
self.code = 0
@action(methods=['post'], detail=False)
def endpointExampleA(self, request, *args, **kwargs):
# I also tried to import here the udf
# from udf.pyspark_udf import UdfExample
try:
sc=SparkSession \
.builder \
.master("spark://192.168.0.105:7077") \
.appName('word_cloud') \
.config("spark.executor.memory", '2g') \
.config('spark.executor.cores', '2') \
.config('spark.cores.max', '2') \
.config("spark.driver.memory",'2g') \
.getOrCreate()
sc.sparkContext.addPyFile("core/udf.zip")
schema = StructType([
StructField("id", IntegerType(),True),
StructField("user", StringType(),True),
StructField("text", StringType(),True),
])
df = sc.createDataFrame([(1,"user1",""), (2,"user2",""), (3,"user3","")], schema)
text_udf = udf(UdfExample().msj, StringType())
text_df = df.withColumn("text",text_udf(df["text"]))
text_df.show(text_df.count(),False) # Here, I receive the error
import pdb;pdb.set_trace()
text_df = text_df.select('id','user','text').toJSON().collect()
for elem in text_df:
self.response_data['data'].append(json.loads(elem))
sc.stop()
self.code = status.HTTP_200_OK
except Exception as e:
self.code = status.HTTP_500_INTERNAL_SERVER_ERROR
return Response(self.response_data,status=self.code)
class ClassBarViewSet(viewsets.ModelViewSet):
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
@action(methods=['get'], detail=False)
def endpointExampleB(self, request, *args, **kwargs):
self.response_data['data'].append({"msj":"Hello World"})
return Response(self.response_data,status=status.HTTP_200_OK)
udf/pyspark_udf.py
# from api.api import ClassBarViewSet # This line fails
class UdfExample():
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
def msj(self, tweet):
import json
import requests
# from api.api import ClassBarViewSet # This line also fails here
try:
# This works
response = requests.get("http://localhost:8000/api/bar/endpointExampleB")
response = response.content.decode('utf-8')
json_response = json.loads(response)
return json_response['data']
# But I'd like to use this code, I can't import
# api.api so this part doesn't works
classB = ClassBarViewSet()
classB.endpointExampleB(request)
self.response_data = classB.response_data['data']
except Exception as e:
self.response_data['error'].append(str(e))
return self.response_data
Ошибка, которую я получаю при попытке сделать: from api.api import ClassBarViewSet
: ImportError: cannot import name 'ClassBarViewSet'
Что нужно знать об ошибке и соответствующем коде:
- Сначала я создал папку udf только с файлом
pyspark_udf.py
и файлом __init.py__
, но я получал ошибку You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.
, я пытался с несколькими ответами ОС, связанными с экспортом переменной DJANGO_SETTINGS_MODULE
, пытаясь выполнить сервер с параметром --settings=core.settings
(я использую только один файл settings.py
), но я не смог найти решение Таким образом, я создал приложение django, а затем исправил ошибку. Наконец, я сгенерировал папку udf с помощью команды zip -r udf udf
с верхнего уровня, я имею в виду полную папку udf. - В фрейме данных pyspark у меня есть другой столбец, сгенерированный другим pyspark udf, в этой функции Я не использую конечную точку drf и работает хорошо.
- Если я попытаюсь импортировать django модель, подобную
from api.models import (User)
, внутри функции msj () в файле pyspark_udf.py
, то я получу эту ошибку: ModuleNotFoundError: No module named 'api'
то есть связано со способом создания .zip
. Если я импортирую модель вверху файла, эта ошибка не появится, я не понимаю, почему. - У меня есть слой внешнего интерфейса, куда я импортирую API и звонки на конечные точки, без каких-либо проблема, так же, как я пытаюсь импортировать в файл
pyspark_udf.py
.
Так почему я не могу импортировать API в приложение pyspark udf? Я искал, но я не мог найти другой вопрос, связанный. Заранее спасибо за любую помощь!