Работая над приложением flask, я недавно реорганизовал его в несколько модулей. Теперь, когда я распространял код сельдерея, он перестал работать
flask_celery.py
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app.py
from flask import Flask,has_request_context,request
from apis import api
from flask_celery import make_celery
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
api.init_app(app)
celery = make_celery(app)
if __name__ == '__main__':
app.run(debug=True)
task.py
from celery import current_app as celery
@celery.task(name="this is my task")
def shutdown():
print('worked')
api.py
import task
@snap.route('/<string:uuid>/shutdown')
class shutdown(Resource):
def post(self,uuid):
task.shutdown.delay()
return 'shutdown_initiated',202,header

Показывает задачу в списке задач, но никогда не получает ее.