Сельдерей: как получить размер очереди надежным и тестируемым способом - PullRequest
0 голосов
/ 29 апреля 2020

Я схожу с ума, пытаясь найти надежный и проверяемый способ получить количество задач, содержащихся в данной очереди Celery. Я уже читал эти 2 обсуждения на SO:

Django Сельдерей получает счетчик задач (только примечание: я не использую Django или какой-либо Python веб-фреймворк )

Получить список задач в очереди в Celery

Но я не смог решить мою проблему, следуя нескольким советам. Я использую Redis в качестве бэкэнда, но мне бы хотелось иметь независимое от бэкенда и гибкое решение, особенно для тестов. Это моя текущая ситуация: я определил класс EnhancedCelery, который наследуется от Celery и добавляет пару методов, в частности get_queue_size() - это тот, который я пытаюсь правильно реализовать / проверить. Ниже приведен код в моем тестовом примере:

celery_test_app = EnhancedCelery(__name__)

# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')

# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'

# We have to setup queues manually, 
# since it seems that auto queue creation doesn't work in tests :(
celery_test_app.conf.task_create_missing_queues = False
celery_test_app.conf.task_default_queue = 'default'
celery_test_app.conf.task_queues = (
    Queue('default', routing_key='task.#'),
    Queue('queue_1', routing_key='q1'),
    Queue('queue_2', routing_key='q2'),
    Queue('queue_3', routing_key='q3'),
)
celery_test_app.conf.task_default_exchange = 'tasks'
celery_test_app.conf.task_default_exchange_type = 'topic'
celery_test_app.conf.task_default_routing_key = 'task.default'
celery_test_app.conf.task_routes = {
    'sample_task': {
        'queue': 'default',
        'routing_key': 'task.default',
    },
    'sample_task_in_queue_1': {
        'queue': 'queue_1',
        'routing_key': 'q1',
    },
    'sample_task_in_queue_2': {
        'queue': 'queue_2',
        'routing_key': 'q2',
    },
    'sample_task_in_queue_3': {
        'queue': 'queue_3',
        'routing_key': 'q3',
    },
}


@celery_test_app.task()
def sample_task():
    return 'sample_task_result'


@celery_test_app.task(queue='queue_1')
def sample_task_in_queue_1():
    return 'sample_task_in_queue_1_result'


@celery_test_app.task(queue='queue_2')
def sample_task_in_queue_2():
    return 'sample_task_in_queue_2_result'


@celery_test_app.task(queue='queue_3')
def sample_task_in_queue_3():
    return 'sample_task_in_queue_3_result'


class EnhancedCeleryTest(TestCase):
    def test_get_queue_size_returns_expected_value(self):
        def add_task(task):
            task.apply_async()

        with start_worker(celery_test_app):
            for _ in range(7):
                add_task(sample_task_in_queue_1)

            for _ in range(4):
                add_task(sample_task_in_queue_2)

            for _ in range(2):
                add_task(sample_task_in_queue_3)

            self.assertEqual(celery_test_app.get_queue_size('queue_1'), 7)
            self.assertEqual(celery_test_app.get_queue_size('queue_2'), 4)
            self.assertEqual(celery_test_app.get_queue_size('queue_3'), 2)

Это попытки реализации get_queue_size()

.1, которые всегда возвращают ноль (jobs == 0)

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel

            try:
                name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
                return jobs
            except (ChannelError, NotFound):
                pass

.2 всегда возвращает ноль

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        inspection = self.control.inspect()

        return inspection.active() # zero!

        # or:

        return inspection.scheduled() # zero!

        # or:

        return inspection.reserved() # zero!

.3 он работает, возвращая ожидаемое число для каждой очереди, но только в тесте (потому что свойство «очереди» не существует при использовании redis)

    def get_queue_size(self, queue_name: str) -> Optional[int]:
        with self.connection_or_acquire() as connection:
            channel = connection.default_channel

            if hasattr(channel, 'queues'):
                queue = channel.queues.get(queue_name)

                if queue is not None:
                    return queue.unfinished_tasks

Ответы [ 2 ]

0 голосов
/ 29 апреля 2020

Вы можете увидеть, как это реализовано в Flower (монитор реального времени для Celery) здесь Они имеют различную реализацию класса Broker для redis и rabbitmq .

Другой способ - использовать события задачи сельдерея : подсчитать, сколько задач было отправлено и сколько было успешно / неудачно

0 голосов
/ 29 апреля 2020

Ни одно из упомянутых вами решений не является полностью правильным, по моему скромному мнению. Как вы уже упоминали, это спецификация backend c, поэтому вам нужно будет обернуть обработчики для всех backend, поддерживаемых Celery, чтобы обеспечить проверку очереди backend-agnosti c. В случае с Redis вы должны напрямую подключиться к Redis и LLEN очереди, которую вы хотите проверить. В случае RabbitMQ вы найдете эту информацию совершенно по-другому. Та же история с SQS ...

Все это обсуждалось в Получить список задач в очереди в Celery thread ...

Наконец, есть Причина, по которой Celery не предоставляет эту функциональность из коробки - информация, я считаю, бесполезна. К тому времени, когда вы получите то, что находится в очереди, оно может быть уже пустым!

Если вы хотите отслеживать, что происходит с вашими очередями, я предлагаю другой подход. - Напишите свой собственный монитор в реальном времени . В этом примере просто фиксируются события с невыполненными задачами, но вы должны иметь возможность легко изменять его, чтобы захватывать все события, которые вас интересуют, и собирать данные об этих задачах (очередь, время, хост, на котором оно было выполнено, и т. Д. c). Очевидно, - пример того, как это делается в более серьезном проекте.

...