Я схожу с ума, пытаясь найти надежный и проверяемый способ получить количество задач, содержащихся в данной очереди Celery. Я уже читал эти 2 обсуждения на SO:
Django Сельдерей получает счетчик задач (только примечание: я не использую Django или какой-либо Python веб-фреймворк )
Получить список задач в очереди в Celery
Но я не смог решить мою проблему, следуя нескольким советам. Я использую Redis в качестве бэкэнда, но мне бы хотелось иметь независимое от бэкенда и гибкое решение, особенно для тестов. Это моя текущая ситуация: я определил класс EnhancedCelery
, который наследуется от Celery
и добавляет пару методов, в частности get_queue_size()
- это тот, который я пытаюсь правильно реализовать / проверить. Ниже приведен код в моем тестовом примере:
celery_test_app = EnhancedCelery(__name__)
# this is needed to avoid exception for ping command
# which is automatically triggered by the worker once started
celery_test_app.loader.import_module('celery.contrib.testing.tasks')
# in memory backend
celery_test_app.conf.broker_url = 'memory://'
celery_test_app.conf.result_backend = 'cache+memory://'
# We have to setup queues manually,
# since it seems that auto queue creation doesn't work in tests :(
celery_test_app.conf.task_create_missing_queues = False
celery_test_app.conf.task_default_queue = 'default'
celery_test_app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('queue_1', routing_key='q1'),
Queue('queue_2', routing_key='q2'),
Queue('queue_3', routing_key='q3'),
)
celery_test_app.conf.task_default_exchange = 'tasks'
celery_test_app.conf.task_default_exchange_type = 'topic'
celery_test_app.conf.task_default_routing_key = 'task.default'
celery_test_app.conf.task_routes = {
'sample_task': {
'queue': 'default',
'routing_key': 'task.default',
},
'sample_task_in_queue_1': {
'queue': 'queue_1',
'routing_key': 'q1',
},
'sample_task_in_queue_2': {
'queue': 'queue_2',
'routing_key': 'q2',
},
'sample_task_in_queue_3': {
'queue': 'queue_3',
'routing_key': 'q3',
},
}
@celery_test_app.task()
def sample_task():
return 'sample_task_result'
@celery_test_app.task(queue='queue_1')
def sample_task_in_queue_1():
return 'sample_task_in_queue_1_result'
@celery_test_app.task(queue='queue_2')
def sample_task_in_queue_2():
return 'sample_task_in_queue_2_result'
@celery_test_app.task(queue='queue_3')
def sample_task_in_queue_3():
return 'sample_task_in_queue_3_result'
class EnhancedCeleryTest(TestCase):
def test_get_queue_size_returns_expected_value(self):
def add_task(task):
task.apply_async()
with start_worker(celery_test_app):
for _ in range(7):
add_task(sample_task_in_queue_1)
for _ in range(4):
add_task(sample_task_in_queue_2)
for _ in range(2):
add_task(sample_task_in_queue_3)
self.assertEqual(celery_test_app.get_queue_size('queue_1'), 7)
self.assertEqual(celery_test_app.get_queue_size('queue_2'), 4)
self.assertEqual(celery_test_app.get_queue_size('queue_3'), 2)
Это попытки реализации get_queue_size()
.1, которые всегда возвращают ноль (jobs == 0)
def get_queue_size(self, queue_name: str) -> Optional[int]:
with self.connection_or_acquire() as connection:
channel = connection.default_channel
try:
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
return jobs
except (ChannelError, NotFound):
pass
.2 всегда возвращает ноль
def get_queue_size(self, queue_name: str) -> Optional[int]:
inspection = self.control.inspect()
return inspection.active() # zero!
# or:
return inspection.scheduled() # zero!
# or:
return inspection.reserved() # zero!
.3 он работает, возвращая ожидаемое число для каждой очереди, но только в тесте (потому что свойство «очереди» не существует при использовании redis)
def get_queue_size(self, queue_name: str) -> Optional[int]:
with self.connection_or_acquire() as connection:
channel = connection.default_channel
if hasattr(channel, 'queues'):
queue = channel.queues.get(queue_name)
if queue is not None:
return queue.unfinished_tasks