Celery: проблема с задачами маршрутизации - только один работник потребляет все задачи из всех очередей - PullRequest
1 голос
/ 10 июля 2019

У меня есть некоторые задачи с настроенными вручную маршрутами и 3 рабочими, которые были настроены на использование задач из определенной очереди. Но только один работник выполняет все задачи, и я не знаю, как решить эту проблему.

Мой celeryconfig.py

    class CeleryConfig:
    enable_utc = True
    timezone = 'UTC'

    imports = ('events.tasks')
    broker_url = Config.BROKER_URL
    broker_transport_options = {'visibility_timeout': 10800}  # 3H

    worker_hijack_root_logger = False

    task_protocol = 2
    task_ignore_result = True
    task_publish_retry_policy = {'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2}
    task_time_limit = 30  # sec
    task_soft_time_limit = 15  # sec

    task_default_queue = 'low'
    task_default_exchange = 'low'
    task_default_routing_key = 'low'

    task_queues = (
        Queue('daily', Exchange('daily'), routing_key='daily'),
        Queue('high', Exchange('high'), routing_key='high'),
        Queue('normal', Exchange('normal'), routing_key='normal'),
        Queue('low', Exchange('low'), routing_key='low'),
        Queue('service', Exchange('service'), routing_key='service'),
        Queue('award', Exchange('award'), routing_key='award'),
    )

    task_route = {
        # -- SCHEDULE QUEUE --
        base_path.format(task='refresh_rank'): {'queue': 'daily'}
        # -- HIGH QUEUE --
        base_path.format(task='execute_order'): {'queue': 'high'},
        # -- NORMAL QUEUE --
        base_path.format(task='calculate_cost'): {'queue': 'normal'},
        # -- SERVICE QUEUE --
        base_path.format(task='send_pin'): {'queue': 'service'},
        # -- LOW QUEUE
        base_path.format(task='invite_to_tournament'): {'queue': 'low'},
        # -- AWARD QUEUE
        base_path.format(task='get_lesson_award'): {'queue': 'award'},
        # -- TEST TASK

    worker_concurrency = multiprocessing.cpu_count() * 2 + 1
    worker_prefetch_multiplier = 1  #
    worker_max_tasks_per_child = 1
    worker_max_memory_per_child = 90000  # 90MB

    beat_max_loop_interval = 60 * 5  # 5 min

Я работаю в докере, часть моего stack.yml

    version: "3.7"

    services:

      worker_high:
        command: celery worker -l debug -A runcelery.celery -Q high -n worker.high@%h

      worker_normal:
        command: celery worker -l debug -A runcelery.celery -Q normal,award,service,low -n worker.normal@%h

      worker_schedule:
        command: celery worker -l debug -A runcelery.celery -Q daily -n worker.schedule@%h

      beat:
        command: celery beat -l debug -A runcelery.celery

      flower:
        command: flower -l debug -A runcelery.celery --port=5555 

      broker:
        image: redis:5.0-alpine

Я думал, что моя конфигурация верна, и команда запускается корректно, но журналы и цветок докера показали, что только worker.normal потребляет все задачи.

I flower screen

Обновление

Вот часть task.py:

def refresh_rank_in_tournaments():
    logger.debug(f'Start task refresh_rank_in_tournaments')
    return AnalyticBackgroundManager.refresh_tournaments_rank()

base_path - это ярлык для полного пути к задаче: base_path = 'events.tasks.{task}'

execute_order код задачи:

    @celery.task(bind=True, default_retry_delay=5)
    def execute_order(self, private_id, **kwargs):
        try:
            return OrderBackgroundManager.execute_order(private_id, **kwargs)
        except IEXException as exc:
            raise self.retry(exc=exc)

Эта задача будет вызываться в представлении как tasks.execute_order.delay(id)

1 Ответ

0 голосов
/ 10 июля 2019

Ваши worker.normal подписаны на обычные, наградные, сервисные, низкие очереди. Кроме того, очередь low является заданной по умолчанию, поэтому каждая задача, у которой нет явно установленной очереди, будет выполняться в worker.normal .

...