Сельдерей: невозможно вызвать задачу через flask, работает за пределами flask - PullRequest
0 голосов
/ 16 января 2020

@ Обновление: изменение задачи с shared_task на app.celeryd.celery.task решает проблему. Существуют ли дополнительные настройки для правильной работы shared_tasks?

Полностью переписана все та же ошибка. Я постараюсь сделать это относительно коротким. Новая структура проекта немного чище и выглядит следующим образом:

proj
|-- app
|   |-- controller
|   |   |-- __init__.py
|   |   +-- greeting_model.py
|   |-- model
|   |   |-- __init__.py
|   |   +-- dto
|   |       |-- __init__.py
|   |       +-- greeting_dto.py
|   |-- service
|   |   |-- __init__.py
|   |   +-- greeting_service.py
|   |-- tasks
|   |   |-- __init__.py
|   |   +-- greeting_tasks.py
|   |-- __init__.py
|   |-- celeryd.py
|   +-- flaskd.py
|-- test.py
|-- worker.py
+-- ws.py

Я инициализирую celery и flask отдельно и предоставляю файл worker.py, который должен быть запущен на клиентских машинах, а ws.py ( flask веб-сервис) будет работать на другом. Инициализация сельдерея проста и использует бэкэнд rp c с брокером RabbitMQ. На данный момент есть две очереди: stati c, но позже они будут заполнены из конфигурации.

from kombu import Queue
from celery import Celery


celery = Celery('LdapProvider',
                broker='amqp://admin:passwd@localhost:5672/dev1',
                backend='rpc',
                include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
    Queue("q1", routing_key="c1.q1"),
    Queue("q2", routing_key="c2.q2"),
)

Worker.py (используется для запуска работника сельдерея - слишком упрощен для этого вопроса):

from app.celeryd import celery as celery
from celery.bin.worker import worker


if __name__ == '__main__':
    celeryd = worker(app=celery)

    options = {
        'broker': 'amqp://admin:passwd@localhost:5672/dev1',
        'queues': 'q1',
        'loglevel': 'info',
        'traceback': True
    }

    celeryd.run(**options)

Я пропущу flask инициализацию и перейду кreeting_service.py, который вызывает задачу celery:

# greeting_service.py:
from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello


class GreetingService(object):
    def say_hello(self, name: str) -> str:
        async_result = say_hello.apply_async((name,), queue='q1')
        return async_result.get()



# greeting_tasks.py
from celery import shared_task


@shared_task(bind=True)
def say_hello(self, name: str) -> str:
    return name.capitalize()

Этот вызов завершается с ошибкой через flask, что бы я ни пытался. Я создал test.py только для того, чтобы проверить, работает ли вообще сельдерей:

from app.celeryd import celery
from app.tasks.greeting_tasks import say_hello


if __name__ == '__main__':
    async_result = say_hello.apply_async(('jackie',), queue='q1')
    print(async_result.get())

Почти так же, как приветствие_service.py, оно просто не вызывается из приветствия приветствия, которое является пространством имен flask_restplus. Разница в том, что test.py приводит к:

/home/pupsz/PycharmProjects/provider/venv37/bin/python /home/pupsz/PycharmProjects/provider/test.py
Jackie

Process finished with exit code 0

[2020-01-16 18:56:17,065: INFO/MainProcess] Received task: app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976]  
[2020-01-16 18:56:17,076: INFO/ForkPoolWorker-2] Task app.tasks.greeting_tasks.say_hello[bb45e271-563e-405b-8529-7335a3810976] succeeded in 0.010257695998006966s: 'Jackie'

, в то время как через flask все, что я получаю, - это уже показано, и в журнале работника не отображается никаких входящих задач, то есть flask apply_asyn c не отправлять задание в RabbitMQ:

File "/home/xyz/PycharmProjects/proj/app/service/greeting_service.py", line 8, in say_hello
return async_result.get()
NotImplementedError: No result backend is configured.
Please see the documentation for more information.

Я обнаружил одну похожую проблему с django без ответа, поэтому я застрял и был бы признателен за какое-то руководство.

1 Ответ

0 голосов
/ 16 января 2020

Решение: Решение о том, чтобы обеспечить совместную работу shared_task, было дано здесь: LINK Изменение инициализации celerz следующим образом:

from kombu import Queue
from celery import Celery


celery = Celery('LdapProvider',
                broker='amqp://admin:passwd@localhost:5672/dev1',
                backend='rpc')
                # include=['app.tasks.greeting_tasks'])
celery.conf.task_queues = (
    Queue("q1", routing_key="c1.q1"),
    Queue("q2", routing_key="c2.q2"),
)
celery.set_default()

Даже если бы я удалил закомментированную строку включения работник успешно выбирает shared_task, определенный в app.tasks.greeting_tasks:

[tasks]
  . app.tasks.greeting_tasks.say_hello

После того, как приложение было установлено в default_app (), больше не генерировалось NotImplementedError, даже при использовании shared_task. Что касается причины ... Я понятия не имею, это было 6 часов проб и ошибок различных конфигов и поиска в Google. В некоторых более сложных ситуациях официальная документация мне не нравится.

...