Я использую Pyspark вместе с Celery в приложении Django. Итак, поток моего кода выглядит следующим образом: 1. Поместите запрос POST для загрузки файла (большой файл). 2. Django обрабатывает запрос и загружает файл в hdfs. Этот большой файл в hdfs читается pyspark, чтобы загрузить его в кассандру. 3. Эта загрузка обрабатывается Celery (от чтения файла до загрузки cassandra). Celery запускает процесс в фоновом режиме и запускает контекст искры, чтобы начать загрузку. 4. Данные загружаются в cassandra, , но контекст искры, созданный с помощью сельдерея, не останавливается даже после использования spark.stop () , когда загрузка завершена.
project -> celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
tasks.py
import celery
from project.celery import app
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
class uploadfile():
def __init__(self):
self.cluster = Cluster(getattr(settings, "CASSANDRA_IP", ""))
self.session = self.cluster.connect()
def start_spark(self):
self.spark = SparkSession.builder.master(getattr(settings,'SPARK_MASTER', settings.SPARK_MASTER))\
.appName('Load CSV to Cassandra')\
.config('spark.jars', self.jar_files_path)\
.config('spark.cassandra.connection.host', getattr(settings,'SPARK_CASSANDRA_CONNECTION_HOST','0.0.0.0'))\
.getOrCreate()
def spark_stop(self):
self.spark.stop()
def file_upload(self):
self.start_spark()
df = self.spark.read.csv(file_from_hdfs)
# do some operation on the dataframe
# self.session.create_cassandra_table_if_does_not_exist
df.write.format('org.apache.spark.sql.cassandra').\
.option('table',table_name)\
.option('keyspace',keyspace)\
.mode('append').save()
self.spark_stop() <<<-------------------- This does not close the spark context
@task(name="api.tasks.uploadfile")
def csv_upload():
# handle request.FILE and upload the file to hdfs
spark_obj = uploadfile()
spark_obj.file_upload()
call_task_script.py
from task import csv_upload
from rest_framework.views import APIView
class post_it(APIView):
def post(request):
csv_upload.delay()
return Response('success')