Поэтому, следуя совету @Patricio, казалось, что это действительно была ошибка импорта.Моя новая структура каталогов выглядит следующим образом:
.
├── __init__.py
├── celeryConfig
│ ├── __init__.py
│ └── celeryApp.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
, тогда как содержимое celeryConfig/celeryApp.py
выглядит следующим образом:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
, а содержимое taskn.py выглядит примерно так:
from celeryConfig.celeryApp import celapp
import time
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
, в то время как entry.py
остается, как есть, с одним изменением, как показано ниже:
from state1.family1.task1 import t1
А теперь, когда сельдерей запускается как: celery -A celeryConfig.celeryApp worker -l info
из корневого каталога project
все работает нормально.В качестве результата вышеприведенной команды я получаю сообщение
.
.
.
[tasks]
. state1.family1.task1.t1
.
.
.
, указывающее, что сельдерей запущен правильно и что задача действительно была зарегистрирована.Итак, теперь, чтобы зарегистрировать все задачи, я могу прочитать каталог / каталоги и динамически создать список include
в celeryApp.py
.(Опубликуем больше об этом, как только сделаем)
Спасибо @ Патрисио