Как импортировать API django rest framework в pyspark udf executor? - PullRequest
1 голос
/ 12 апреля 2020

У меня есть проект, в котором я интегрирую django rest framework (drf) и apache spark, поэтому у меня есть некоторые конечные точки, и мне нужно выполнить некоторые задания с помощью spark, используя udf. У меня есть следующая структура проекта:

core/
    udf/
    ├── pyspark_udf.py
    ├── (and the rest of the files of a django app)
    api/
    ├── api.py
    ├── urls.py
    ├── (and the rest of the files of a django app)
    udf.zip

Итак, в соответствии с этой структурой:

  1. Во-первых, в файле api.py у меня есть конечная точка, где мне нужно выполнить job.
  2. Во-вторых, внутри pyspark_udf.py я вызываю конечную точку в файле api.py, делаю там логики c и возвращаю ответ.
  3. Затем опять же в файле api.py я использую возвращаемое значение для создания нового столбца в фрейме данных pyspark.
  4. Наконец, я возвращаю json ответ данных в конечной точке drf.

Я добился такого поведения, но используя requests в функции pyspark_udf.py, что я считаю, что Это не элегантно или не лучшая практика, работать с API, который находится в моем собственном проекте, вместо простого вызова класса конечной точки drf.

Я создал минимальный код ниже, чтобы воспроизвести ошибку, и если кто-то может найти решение, я буду признателен:

api/urls.py

from django.conf.urls import url
from rest_framework import routers
from .api import (ClassFooViewSet,ClassBarViewSet)

router = routers.DefaultRouter()

urlpatterns = [
    url(r'^api/foo/endpointExampleA', ClassFooViewSet.as_view({'post':'endpointExampleA'}), name='endpointExampleA'),
    url(r'^api/bar/endpointExampleB', ClassBarViewSet.as_view({'get':'endpointExampleB'}), name='endpointExampleB')
]

urlpatterns += router.urls

api/api.py

from rest_framework import viewsets
from rest_framework import views
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
import json

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from udf.pyspark_udf import UdfExample

class ClassFooViewSet(viewsets.ViewSet):

    def __init__(self):
        self.response_data = {'error': [], 'data': []}
        self.data = {}
        self.code = 0

    @action(methods=['post'], detail=False)
    def endpointExampleA(self, request, *args, **kwargs):
        # I also tried to import here the udf
        # from udf.pyspark_udf import UdfExample

        try:

            sc=SparkSession \
                .builder \
                .master("spark://192.168.0.105:7077") \
                .appName('word_cloud') \
                .config("spark.executor.memory", '2g') \
                .config('spark.executor.cores', '2') \
                .config('spark.cores.max', '2') \
                .config("spark.driver.memory",'2g') \
                .getOrCreate()

            sc.sparkContext.addPyFile("core/udf.zip")

            schema = StructType([
                StructField("id", IntegerType(),True),
                StructField("user", StringType(),True),    
                StructField("text", StringType(),True),
            ])

            df = sc.createDataFrame([(1,"user1",""), (2,"user2",""), (3,"user3","")], schema)

            text_udf = udf(UdfExample().msj, StringType())

            text_df = df.withColumn("text",text_udf(df["text"]))

            text_df.show(text_df.count(),False) # Here, I receive the error
            import pdb;pdb.set_trace()

            text_df = text_df.select('id','user','text').toJSON().collect()

            for elem in text_df:                    
                self.response_data['data'].append(json.loads(elem))

            sc.stop()
            self.code = status.HTTP_200_OK

        except Exception as e:
            self.code = status.HTTP_500_INTERNAL_SERVER_ERROR
        return Response(self.response_data,status=self.code)

class ClassBarViewSet(viewsets.ModelViewSet):
    def __init__(self):
        self.response_data = {'error': [], 'data': []}
        self.data = {}

    @action(methods=['get'], detail=False)
    def endpointExampleB(self, request, *args, **kwargs):
        self.response_data['data'].append({"msj":"Hello World"})
        return Response(self.response_data,status=status.HTTP_200_OK)

udf/pyspark_udf.py

# from api.api import ClassBarViewSet # This line fails

class UdfExample():

    def __init__(self):
        self.response_data = {'error': [], 'data': []}
        self.data = {}

    def msj(self, tweet):
        import json
        import requests 
        # from api.api import ClassBarViewSet   # This line also fails here

        try:

            # This works
            response = requests.get("http://localhost:8000/api/bar/endpointExampleB")
            response = response.content.decode('utf-8')
            json_response = json.loads(response)
            return json_response['data']

            # But I'd like to use this code, I can't import
            # api.api so this part doesn't works
            classB = ClassBarViewSet()
            classB.endpointExampleB(request)
            self.response_data = classB.response_data['data']

        except Exception as e:
            self.response_data['error'].append(str(e))
        return self.response_data

Ошибка, которую я получаю при попытке сделать: from api.api import ClassBarViewSet: ImportError: cannot import name 'ClassBarViewSet'

Что нужно знать об ошибке и соответствующем коде:

  • Сначала я создал папку udf только с файлом pyspark_udf.py и файлом __init.py__, но я получал ошибку You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings., я пытался с несколькими ответами ОС, связанными с экспортом переменной DJANGO_SETTINGS_MODULE, пытаясь выполнить сервер с параметром --settings=core.settings (я использую только один файл settings.py), но я не смог найти решение Таким образом, я создал приложение django, а затем исправил ошибку. Наконец, я сгенерировал папку udf с помощью команды zip -r udf udf с верхнего уровня, я имею в виду полную папку udf.
  • В фрейме данных pyspark у меня есть другой столбец, сгенерированный другим pyspark udf, в этой функции Я не использую конечную точку drf и работает хорошо.
  • Если я попытаюсь импортировать django модель, подобную from api.models import (User) , внутри функции msj () в файле pyspark_udf.py, то я получу эту ошибку: ModuleNotFoundError: No module named 'api' то есть связано со способом создания .zip. Если я импортирую модель вверху файла, эта ошибка не появится, я не понимаю, почему.
  • У меня есть слой внешнего интерфейса, куда я импортирую API и звонки на конечные точки, без каких-либо проблема, так же, как я пытаюсь импортировать в файл pyspark_udf.py.

Так почему я не могу импортировать API в приложение pyspark udf? Я искал, но я не мог найти другой вопрос, связанный. Заранее спасибо за любую помощь!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...