Контекст ближней искры, запущенный через задачу Celery в приложении django - PullRequest
0 голосов
/ 02 ноября 2019

Я использую Pyspark вместе с Celery в приложении Django. Итак, поток моего кода выглядит следующим образом: 1. Поместите запрос POST для загрузки файла (большой файл). 2. Django обрабатывает запрос и загружает файл в hdfs. Этот большой файл в hdfs читается pyspark, чтобы загрузить его в кассандру. 3. Эта загрузка обрабатывается Celery (от чтения файла до загрузки cassandra). Celery запускает процесс в фоновом режиме и запускает контекст искры, чтобы начать загрузку. 4. Данные загружаются в cassandra, , но контекст искры, созданный с помощью сельдерея, не останавливается даже после использования spark.stop () , когда загрузка завершена.

project -> celery.py

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

tasks.py

import celery
from project.celery import app
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession

class uploadfile():
    def __init__(self):
        self.cluster = Cluster(getattr(settings, "CASSANDRA_IP", ""))
        self.session = self.cluster.connect()
    def start_spark(self):
        self.spark = SparkSession.builder.master(getattr(settings,'SPARK_MASTER', settings.SPARK_MASTER))\
                                .appName('Load CSV to Cassandra')\
                                .config('spark.jars', self.jar_files_path)\
                                .config('spark.cassandra.connection.host', getattr(settings,'SPARK_CASSANDRA_CONNECTION_HOST','0.0.0.0'))\
                                .getOrCreate()
    def spark_stop(self):
        self.spark.stop()
    def file_upload(self):
        self.start_spark()
        df = self.spark.read.csv(file_from_hdfs)
        # do some operation on the dataframe
        # self.session.create_cassandra_table_if_does_not_exist
        df.write.format('org.apache.spark.sql.cassandra').\
                    .option('table',table_name)\
                    .option('keyspace',keyspace)\
                    .mode('append').save()
        self.spark_stop()  <<<-------------------- This does not close the spark context

@task(name="api.tasks.uploadfile")
def csv_upload():
    # handle request.FILE and upload the file to hdfs
    spark_obj = uploadfile()
    spark_obj.file_upload()

call_task_script.py

from task import csv_upload
from rest_framework.views import APIView

class post_it(APIView):
    def post(request):
        csv_upload.delay()
        return Response('success')

...