Сельдерей видит мою задачу, но не выполняет ее - PullRequest
0 голосов
/ 05 февраля 2020

Я пытаюсь запустить цепочку сельдерея локально с брокером Redis.

Проблема: Как вы можете видеть под рабочей командой, задачи не запускаются. Ничего не происходит.

Найдите ниже всю необходимую информацию.

Redis-сервер работает

redis-server
[15048] 05 Feb 11:36:30 * Server started, Redis version 2.4.5
[15048] 05 Feb 11:36:30 * DB loaded from disk: 0 seconds
[15048] 05 Feb 11:36:30 * The server is now ready to accept connections on port 6379
...

Удар сельдерея также работает

celery beat -b redis://localhost:6379/0
celery beat v4.2.1 (windowlicker) is starting.
__    -    ... __   -        _
LocalTime -> 2020-02-05 11:37:30
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.default.Loader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

Но когда я запускаю работника, он может видеть задачи, но не выполняет их:

celery -A module.tasks worker --loglevel=info -Q training_queue -P eventlet

[tasks]
  . module.tasks.generate_the_chain
  . module.tasks.post_pipeline_message
  . module.tasks.train_by_site_post

[2020-02-05 11:39:14,700: INFO/MainProcess] Connected to redis://localhost:6379/0
[2020-02-05 11:39:16,746: INFO/MainProcess] mingle: searching for neighbors
[2020-02-05 11:39:20,804: INFO/MainProcess] mingle: all alone
[2020-02-05 11:39:21,836: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/0.
[2020-02-05 11:39:22,850: INFO/MainProcess] celery@HOST ready.

После этой строки ничего не происходит.

НИЖЕ код:

init .py

import os
from celery import Celery 

CELERY_APP_NAME = 'app_name'
CELERY_BROKER = 'redis://localhost:6379/0'
CELERY_BACKEND = CELERY_BROKER


CELERY = Celery(CELERY_APP_NAME, 
                backend=CELERY_BACKEND,
                broker=CELERY_BROKER,
                include=['module.tasks']
                )

И tasks.py

import os
import logging
import time
from celery import Celery, chain, signature
from celery.schedules import crontab
from module import CELERY


@CELERY.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    site_id = "f1ae"

    sign = signature(
        'module.tasks.generate_the_chain',
        args=(site_id),
        queue='training_queue'
    )
    sender.add_periodic_task(5.0, sign, name='training')


@CELERY.task
def generate_the_chain(site_id):
    print("Hello World")
    """
    This function run the chain of functions and is then
    scheduled with Celery beat
    """

    chain = signature(
        'module.tasks.site_post',
        args=(site_id),
        queue='training_queue'
    )

    chain |= signature(
        'module.tasks.post_pipeline_message',
        args=(),
        queue='training_queue'
    )

    chain.apply_async()


@CELERY.task
def site_post(site_id):

    print(f"Hello {site_id}")

    return True

@CELERY.task
def post_pipeline_message(success):
    if success is True:
        LOGGER.info("Pipeline build with success")
...