Как назвать сельдерею shared_task? - PullRequest
0 голосов
/ 17 сентября 2018

Я пытаюсь использовать stream_framework в своем приложении ( НЕ Django ), но у меня проблема с вызовом общих задач stream_framework. Сельдерей, кажется, находит задачи:

-------------- celery@M3800 v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-4.15.0-34-generic-x86_64-with-Ubuntu-18.04-bionic
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         task:0x7f8d22176dd8
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . formshare.processes.feeds.tasks.test_shared_task
  . stream_framework.tasks.fanout_operation
  . stream_framework.tasks.fanout_operation_hi_priority
  . stream_framework.tasks.fanout_operation_low_priority
  . stream_framework.tasks.follow_many
  . stream_framework.tasks.unfollow_many

[2018-09-17 10:06:28,240: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-09-17 10:06:28,246: INFO/MainProcess] mingle: searching for neighbors
[2018-09-17 10:06:29,251: INFO/MainProcess] mingle: all alone

Я запускаю сельдерей с:

celery -A formshare.processes.feeds.celery_app worker --loglevel=info

Мой celery_app имеет:

from celery import Celery

celeryApp = Celery('task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0', include='formshare.processes.feeds.tasks')

Проблема в том, что delay () не запускает общую задачу. Я также создал общую задачу в своем приложении, но когда я вызываю delay (), эта задача также не вызывается. Я думаю, мне нужно зарегистрировать их как вызываемые из моего приложения? Кажется, я не нахожу никакой информации в Интернете.

Я также попытался автоматически обнаружить задачи, но у меня возникла та же проблема:

celeryApp.autodiscover_tasks(['stream_framework', 'formshare.processes.feeds'],force=True)

Любая идея высоко ценится.

1 Ответ

0 голосов
/ 22 сентября 2018

Общие задачи - это особая вещь, используемая для фактического обмена задачами между различными приложениями (я думаю, в основном это приложения Django, но я использовал их, например, в колбе).

У нас была та же проблема, и чтобы она заработала, мы установили

 celery_app.set_default()

Об экземпляре сельдерея

В противном случае другой способ сделать все правильно - это вызвать задачу через само приложение, так что что-то в этих строках

from celery import current_app
.
.
.
current_app.tasks['my.tasks.to.exec'].delay(something)

Это всегда работает, если это общая задача, и, следовательно, она не привязана ни к какому приложению при импорте, в этом случае оно принадлежит приложению, настроенному как «current_app»

...