Ошибка flask RuntimeError: Working outside of application context.
происходит потому, что вы не в Flask application_context () .
Вы должны использовать celery shared_task , что что вам нужно, учитывая структуру MVC.
celery_flask/
├── celery_tasks
│ ├── app_tasks.py
│ ├── __init__.py
├── celery_worker.py
├── controllers
│ ├── __init__.py
│ ├── some_controller.py
├── __init__.py
└── server.py
Script app_tasks.py
#=====================
# app_tasks.py
#=====================
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task(name='celery_tasks.add_together')
def add_together(x, y):
return x + y
Декоратор @ shared_task возвращает прокси, который всегда указывает на активные экземпляры Celery:
>>> from celery import Celery, shared_task
>>> @shared_task
... def add_together(x, y):
... return x + y
...
>>> app1 = Celery(broker='amqp://')
>>> add_together.app is app1
True
>>> app2 = Celery(broker='redis://')
>>> add_together.app is app2
True
После того, как вы определили свою задачу, вы можете вызвать их, используя ссылку на приложение Celery. Это приложение сельдерея может быть частью flask application_context () . Пример:
Сценарий server.py
from __future__ import absolute_import
from flask import Flask
from celery import Celery
from controllers.some_controller import controller
flask_app = Flask(__name__)
flask_app.secret_key = 'SECRET'
flask_app.register_blueprint(controller)
# Celery Configuration
def make_celery( app ):
celery = Celery('flask-celery-app', backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'],
include=['celery_tasks.app_tasks'])
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
def list_celery_task( ):
from celery.task.control import inspect
i = inspect()
i.registered_tasks()
from itertools import chain
t = set(chain.from_iterable( i.registered_tasks().values() ))
print "registered_tasks={}".format( t )
#======================================
# MAIN
#======================================
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
flask_app.celery = celery
list_celery_task( )
if __name__ == "__main__":
flask_app.run(host='0.0.0.0', debug=True)
Сценарий some_controller.py
#============================
# some_controller.py
#============================
from __future__ import absolute_import
from flask import Blueprint
from flask import current_app
controller = Blueprint('controller', __name__,
template_folder='templates/')
@controller.route('/add', methods=['GET'])
def add():
print "calling add"
result = current_app.celery.send_task('celery_tasks.add_together',args=[12,6])
r = result.get()
print 'Processing is {}'.format( r )
return 'Processing is {}'.format( r )
Наконец, запустите рабочий для выполнения задач:
celery -A celery_worker worker --loglevel=DEBUG
скрипт celery_worker.py
#============================
# celery_worker.py
#============================
from __future__ import absolute_import
from celery import Celery
# Celery Configuration
def make_celery():
celery = Celery('flask-celery-app', backend='redis://localhost:6379',
broker='redis://localhost:6379',
include=['celery_tasks.app_tasks'])
return celery
celery = make_celery()
print "tasks={}".format( celery.tasks.keys() )