Я добавил работника из сельдерея в пакет app
:
proj
├── app
│ ├── worker.py
│ └── server.py
└── db
├── db.sql
├── check_db_health.py
└── documents.md
Я могу запустить работника обоими способами:
#1st way
user@host:~/proj
$ celery worker -A app.worker
#2nd way
user@host:~/proj
$ python -m app.worker
Но между ними другое поведение.
- Второй не имеет настраиваемых аргументов для работника, поэтому я не хочу его использовать. Но это работает ОТЛИЧНО! ошибки не найдены.
- Первый тоже хорошо загружается, но при запуске задачи обычно встречается ошибка импорта модуля из-за реального кода проекта, имеющего цепочку импорта динамических модулей.
server.py
from app.worker import enqueue
def sum(a, b):
return a+b
enqueue(sum, a, b)
worker.py
def enqueue(callback, *args, **kwargs):
module_path = inspect.getfile(callback)
module_name = inspect.getmodule(callback).__name__
func_name = callback.__name__
no_delay = kwargs.pop('no_delay', False)
return do_legacy_task.apply_async(args=args, kwargs=dict(kwargs, **{
"module_name": module_name,
"module_path": module_path,
"func_name": func_name
}))
return None
@task(bind=True, name="app.worker.do_callback_task")
do_legacy_task(self, *args, **kwargs):
clean_kwargs = copy.deepcopy(kwargs)
module_path = clean_kwargs.pop('module_path')
module_name = clean_kwargs.pop('module_name')
func_name = clean_kwargs.pop('func_name')
spec = importlib.util.spec_from_file_location(module_name, module_path)
if spec is None:
print("can't find the module %s in file %s" % (module_name, module_path,))
else:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
sys.modules[module_name] = module
callback = getattr(module, func_name)
return callback(*args, **clean_kwargs)
return None
Техника динамического импорта сервера и работника основана на этом примере
ошибка
File "/Users/johndoe/proj/app/server.py", line 16, in <module>
from db.check_db_health import CodeDiagnostic
ModuleNotFoundError: No module named 'db'
Почему двухстороннее поведение отличается при импорте других модулей?
Как исправить ошибку выше?
UPDATE
Я обнаружил, что могу пропустить эту ошибку, передав дополнительный аргумент для Celery:
user@host:~/proj
$ celery worker -A app.worker --include db.check_db_health
Было бы замечательно, если бы был способ сделать это автоматически.