Есть несколько вариантов, как этого добиться. Рекомендуемый подход явно запрашивает у работников список зарегистрированных задач с помощью broadcast
управляющей команды:
app = Celery(broker='amqp://')
reply = app.control.broadcast('registered', reply=True)
tasks = set(t for n in reply for w in n.values() for t in w)
if 'my_task_name' in tasks:
# there is an online worker that can handle the task
# => use async mode
app.send_task('my_task_name', args=(...), kwargs={...}, ...)
# or use my_task.apply_async(...)
else:
# there is no online worker for the task
# => call the function directly
my_task(...)
В качестве альтернативы вы можете попробовать отправить задачу напрямую брокеру и проверить результат через короткий промежуток времени. Если по прошествии этого времени задача все еще имеет результат PENDING
, вероятно, нет рабочего, который мог бы справиться с задачей. В таком случае вы можете вызвать задачу как обычную функцию:
app = Celery(broker='amqp://', backend='rpc://')
result = app.send_task('my_task_name', args=(...), kwargs={...}, ..., expiration=5)
time.sleep(5)
if result.state == 'PENDING':
# presumably the task didn't start because there hasn't been any worker
my_task(...)
Однако следует быть осторожным со вторым подходом. Могут быть другие причины, по которым задача находится в состоянии ожидания (например, они заняты и предварительно выбрали максимальное количество задач). Кроме того, в этом примере предполагается, что ваш брокер поддерживает истечение срока действия AMQP (например, RabbitMQ ) и что вы не игнорируете результат задачи.