Я пытаюсь запустить цепочку сельдерея локально с брокером Redis.
Проблема: Как вы можете видеть под рабочей командой, задачи не запускаются. Ничего не происходит.
Найдите ниже всю необходимую информацию.
Redis-сервер работает
redis-server
[15048] 05 Feb 11:36:30 * Server started, Redis version 2.4.5
[15048] 05 Feb 11:36:30 * DB loaded from disk: 0 seconds
[15048] 05 Feb 11:36:30 * The server is now ready to accept connections on port 6379
...
Удар сельдерея также работает
celery beat -b redis://localhost:6379/0
celery beat v4.2.1 (windowlicker) is starting.
__ - ... __ - _
LocalTime -> 2020-02-05 11:37:30
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.default.Loader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%WARNING
. maxinterval -> 5.00 minutes (300s)
Но когда я запускаю работника, он может видеть задачи, но не выполняет их:
celery -A module.tasks worker --loglevel=info -Q training_queue -P eventlet
[tasks]
. module.tasks.generate_the_chain
. module.tasks.post_pipeline_message
. module.tasks.train_by_site_post
[2020-02-05 11:39:14,700: INFO/MainProcess] Connected to redis://localhost:6379/0
[2020-02-05 11:39:16,746: INFO/MainProcess] mingle: searching for neighbors
[2020-02-05 11:39:20,804: INFO/MainProcess] mingle: all alone
[2020-02-05 11:39:21,836: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/0.
[2020-02-05 11:39:22,850: INFO/MainProcess] celery@HOST ready.
После этой строки ничего не происходит.
НИЖЕ код:
init .py
import os
from celery import Celery
CELERY_APP_NAME = 'app_name'
CELERY_BROKER = 'redis://localhost:6379/0'
CELERY_BACKEND = CELERY_BROKER
CELERY = Celery(CELERY_APP_NAME,
backend=CELERY_BACKEND,
broker=CELERY_BROKER,
include=['module.tasks']
)
И tasks.py
import os
import logging
import time
from celery import Celery, chain, signature
from celery.schedules import crontab
from module import CELERY
@CELERY.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
site_id = "f1ae"
sign = signature(
'module.tasks.generate_the_chain',
args=(site_id),
queue='training_queue'
)
sender.add_periodic_task(5.0, sign, name='training')
@CELERY.task
def generate_the_chain(site_id):
print("Hello World")
"""
This function run the chain of functions and is then
scheduled with Celery beat
"""
chain = signature(
'module.tasks.site_post',
args=(site_id),
queue='training_queue'
)
chain |= signature(
'module.tasks.post_pipeline_message',
args=(),
queue='training_queue'
)
chain.apply_async()
@CELERY.task
def site_post(site_id):
print(f"Hello {site_id}")
return True
@CELERY.task
def post_pipeline_message(success):
if success is True:
LOGGER.info("Pipeline build with success")