Сельдерей: Взаимодействовать / Общаться с запущенным заданием - PullRequest
2 голосов
/ 18 января 2020

Здесь появляется связанный (хотя и не идентичный) вопрос: Взаимодействовать с текущей задачей сельдерея

Легко запустить задачу и получить ее уникальный идентификатор:

async_result = my_task.delay()
task_id = async_result.task_id

Легко передать сообщение, которое достигнет пользовательской команды в работнике:

my_celery_app.control.broadcast('custom_command', arguments= {'id': task_id})

Проблема возникает в том, что работник запускается в виде небольшого дерева процессов, состоящего из одного руководителя и числа детей. Супервизор получает сообщения и действует на них, делегируя выполнение задач дочерним элементам, но в супервизоре выполняются пользовательские команды.

Я не могу найти четкой документации или средств, с помощью которых пользовательская команда, выполняющаяся в супервизоре, может отправить сообщение. указать c дочерний элемент, который выполняет заданную задачу, или даже передать сообщение всем дочерним элементам, чтобы дочерний элемент, выполняющий определенную задачу, мог на него воздействовать.

Супервизор делегирует работу дочерним элементам и имеет некоторые линия связи открыта, но я не могу найти никаких документально подтвержденных способов ее использования.

Можно написать пользовательскую пару элементов управления и задач, которые взаимодействуют друг с другом независимо от отношений с Celery, используя либо kombu (как это делает Celery), либо каким-либо другим способом (python 3.8 заманчиво открывает возможность совместной памяти). объекты).

Но было бы гораздо более уместным и логичным, если бы у Celery был простой API-интерфейс, с помощью которого пользовательский элемент управления мог бы передавать сообщение выполняющейся задаче (либо дочернему процессу, выполняющему его, либо транслировать всем дочерним элементам с task_id в качестве параметра, позволяющего правильно воздействовать на него).

Возможно, что:

  1. Такой API существует в Celery, и я не нашел его
  2. Существует скрытое средство доступа к функциям Celery для отправки такого сообщения
  3. В Celery такого средства не существует

Это общая проблема c, и она будет полезна, если огромная полезность популярности Celery, если бы у нас были какие-то средства взаимодействия с запущенной задачей данного task_id, чтобы отправлять ей сообщения.

Бэкэнд-результат предоставляет средства для получения сообщений от запущенных задач. То, что кажется неясным или отсутствует, - это то, как отправлять сообщения выполняющимся задачам.

Пример:

В конфигурации по умолчанию (т.е. с использованием конфигурации пула преформ), если я запускаю celery -A myapp worker он генерирует процессы, подобные этому pstree output:

[celeryd: celer(19955)─┬─[celeryd: celer(19957)
                       ├─[celeryd: celer(19958)
                       ├─[celeryd: celer(19959)
                       ├─[celeryd: celer(19960)
                       ├─[celeryd: celer(19961)
                       ├─[celeryd: celer(19962)
                       ├─[celeryd: celer(19963)
                       └─[celeryd: celer(19964)

То есть рабочий (супервизор) запускает 8 дочерних элементов (предварительно созданный пул), и именно супервизор получает сообщения (через RabbitMQ). .. заметно, чтобы начать задачу. Затем он (я предполагаю) отправляет сообщение (используя kombu, который, по-видимому, также использует RabbitMQ) одному из детей в пуле для запуска задачи.

Теперь задача заключается в том, чтобы клиент, запустивший задачу, которая знает идентификатор задачи, взаимодействовал с реально выполняющейся функцией в дочернем процессе.

Мы можем пройти половину пути с помощью настраиваемой команды управления:

http://docs.celeryproject.org/en/latest/userguide/workers.html#writing -your-own-remote-control-command

Но эта команда выполняется на работнике (супервизоре) и поэтому Для связи с запущенной функцией (отправьте запрос) нам необходим механизм для этого. И вопрос в том, предоставляет ли Celery тот, который я не нашел, ни документированный, либо нет (внутренний), а если нет, то кто-нибудь реализовал другое средство?

...