Как получить flask контекст запроса в задаче сельдерея? - PullRequest
0 голосов
/ 10 января 2020

У меня есть сервер flask, работающий в оружейном мозге. В моем приложении flask я хочу обрабатывать большие файлы загрузки (> 20 ГБ), поэтому я планирую разрешить задаче celery выполнять обработку большого файла.

Проблема заключается в том, что файл извлекается из request.files уже занимает довольно много времени, в то время как gunicorn прекращает работу над этим запросом. Я мог бы увеличить время ожидания, но максимальный размер файла в настоящее время неизвестен, поэтому я не знаю, сколько времени мне понадобится.

Я планировал сделать контекст запроса доступным для задачи celery, так как это было описано здесь: http://xion.io/post/code/celery-include-flask-request-context.html, но я не могу заставить его работать

Q1 Подпись правильна?

Я установил подпись с celery.signature(handle_large_file, args={}, kwargs={}) и ничего не жалуется. Я получаю аргументы, которые передаю от обработчика запроса flask к задаче celery, но это все. Должен ли я каким-то образом получить дескриптор контекста здесь?

Q2 как использовать контекст?

Я бы подумал, что контекст запроса flask доступен. мог просто использовать request.files в моем коде, но тогда я получаю предупреждение, что я вне контекста.

Использование сельдерея 4.4.0

Код:

# in celery.py:
from flask import request
from celery import Celery

celery = Celery('celery_worker',
                backend=Config.CELERY_RESULT_BACKEND,
                broker=Config.CELERY_BROKER_URL)

@celery.task(bind=True)
def handle_large_file(task_object, data):
   # do something with the large file...
   # what I'd like to do:
   files = request.files['upfile']
   ...

celery.signature(handle_large_file, args={}, kwargs={})
# in main.py
def create_app():
   app = Flask(__name__.split('.')[0])
   ...
   celery_worker.conf.update(app.config)

   # copy from the blog
   class RequestContextTask(Task):...
   celery_worker.Task = RequestContextTask
# in Controller.py
@FILE.route("", methods=['POST'])
def upload():
    data = dict()
    ...
    handle_large_file.delay(data)

Чего мне не хватает?

...