Здесь появляется связанный (хотя и не идентичный) вопрос: Взаимодействовать с текущей задачей сельдерея
Легко запустить задачу и получить ее уникальный идентификатор:
async_result = my_task.delay()
task_id = async_result.task_id
Легко передать сообщение, которое достигнет пользовательской команды в работнике:
my_celery_app.control.broadcast('custom_command', arguments= {'id': task_id})
Проблема возникает в том, что работник запускается в виде небольшого дерева процессов, состоящего из одного руководителя и числа детей. Супервизор получает сообщения и действует на них, делегируя выполнение задач дочерним элементам, но в супервизоре выполняются пользовательские команды.
Я не могу найти четкой документации или средств, с помощью которых пользовательская команда, выполняющаяся в супервизоре, может отправить сообщение. указать c дочерний элемент, который выполняет заданную задачу, или даже передать сообщение всем дочерним элементам, чтобы дочерний элемент, выполняющий определенную задачу, мог на него воздействовать.
Супервизор делегирует работу дочерним элементам и имеет некоторые линия связи открыта, но я не могу найти никаких документально подтвержденных способов ее использования.
Можно написать пользовательскую пару элементов управления и задач, которые взаимодействуют друг с другом независимо от отношений с Celery, используя либо kombu (как это делает Celery), либо каким-либо другим способом (python 3.8 заманчиво открывает возможность совместной памяти). объекты).
Но было бы гораздо более уместным и логичным, если бы у Celery был простой API-интерфейс, с помощью которого пользовательский элемент управления мог бы передавать сообщение выполняющейся задаче (либо дочернему процессу, выполняющему его, либо транслировать всем дочерним элементам с task_id в качестве параметра, позволяющего правильно воздействовать на него).
Возможно, что:
- Такой API существует в Celery, и я не нашел его
- Существует скрытое средство доступа к функциям Celery для отправки такого сообщения
- В Celery такого средства не существует
Это общая проблема c, и она будет полезна, если огромная полезность популярности Celery, если бы у нас были какие-то средства взаимодействия с запущенной задачей данного task_id, чтобы отправлять ей сообщения.
Бэкэнд-результат предоставляет средства для получения сообщений от запущенных задач. То, что кажется неясным или отсутствует, - это то, как отправлять сообщения выполняющимся задачам.
Пример:
В конфигурации по умолчанию (т.е. с использованием конфигурации пула преформ), если я запускаю celery -A myapp worker
он генерирует процессы, подобные этому pstree
output:
[celeryd: celer(19955)─┬─[celeryd: celer(19957)
├─[celeryd: celer(19958)
├─[celeryd: celer(19959)
├─[celeryd: celer(19960)
├─[celeryd: celer(19961)
├─[celeryd: celer(19962)
├─[celeryd: celer(19963)
└─[celeryd: celer(19964)
То есть рабочий (супервизор) запускает 8 дочерних элементов (предварительно созданный пул), и именно супервизор получает сообщения (через RabbitMQ). .. заметно, чтобы начать задачу. Затем он (я предполагаю) отправляет сообщение (используя kombu, который, по-видимому, также использует RabbitMQ) одному из детей в пуле для запуска задачи.
Теперь задача заключается в том, чтобы клиент, запустивший задачу, которая знает идентификатор задачи, взаимодействовал с реально выполняющейся функцией в дочернем процессе.
Мы можем пройти половину пути с помощью настраиваемой команды управления:
http://docs.celeryproject.org/en/latest/userguide/workers.html#writing -your-own-remote-control-command
Но эта команда выполняется на работнике (супервизоре) и поэтому Для связи с запущенной функцией (отправьте запрос) нам необходим механизм для этого. И вопрос в том, предоставляет ли Celery тот, который я не нашел, ни документированный, либо нет (внутренний), а если нет, то кто-нибудь реализовал другое средство?