Сельдерей - 'Получено незарегистрированное задание типа' с Flask, шаблон фабрики приложений - PullRequest
0 голосов
/ 04 февраля 2020

Я использую Celery с Flask, используя шаблон фабрики приложений. Сельдерей получит задачу, но он говорит, что задача не зарегистрирована. Я запускаю свое приложение с flask run, FLASK_APP=app.factory::create_app().

Рабочий из сельдерея запускается с помощью celery -A app.celery worker --pool=eventlet. Задача получена, и я получаю логин из консоли, но выдает ошибку, как показано ниже.

Получено незарегистрированное задание типа 'app.tasks.do_stuff'. Сообщение было проигнорировано и отброшено.

Не забыли ли вы импортировать модуль, содержащий это задание? Или, может быть, вы используете относительный импорт?

В чем я ошибся?

Код

app.__init__.py

from app.config import BaseConfig
from celery import Celery

celery = Celery(__name__,
                broker=BaseConfig.CELERY_BROKER_URL,
                backend=BaseConfig.CELERY_RESULT_BACKEND)

class BaseConfig(object)
    CELERY_BROKER_URL = 'redis://localhost:6379',
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'
    CELERY_ACCEPT_CONTENT = ['pickle']
    CELERY_IMPORTS = ['app.tasks']


app.factory.py

######################################
#### Application Factory Function ####
######################################

def create_app(config=BaseConfig):

    # Initialise the Flask app
    app = Flask(__name__)
    if config == 'test':
        app.config.from_object(TestConfig)
    else:
        app.config.from_object(BaseConfig)
    initialise_extensions(app)
    register_template_filters(app)
    app.register_blueprint(core)
    app.register_blueprint(api)
    app.register_blueprint(commands)
    register_shell_context(app)
    return app


##########################
#### Helper Functions ####
##########################


def initialise_extensions(app):
    # Since the application instance is now created, pass it to each Flask
    # extension instance to bind it to the Flask application instance (app)
    csrf.init_app(app)
    db.init_app(app)
    migrate.init_app(app, db)
    # Login
    login.init_app(app)
    login.login_view = 'core.login'
    init_celery(app, celery)
app.tasks.py


from . import celery


@celery.task(serializer='pickle')
def do_stuff(data, user):
    pass
...