Сельдерей определяет рабочие процессы на лету - PullRequest
0 голосов
/ 05 июля 2018

Я использую Python 3.6.6 и Сельдерей 4.2.0.

Я пытаюсь управлять рабочими процессами динамических задач , которые могут измениться на лету. Рабочие процессы могут содержать длинные и короткие шаги.

Например: Изначально у меня есть следующий рабочий процесс:

Начальный рабочий процесс

Но в какой-то момент Я должен добавить еще одну задачу, которая зависит от A . Так что задача может ждать до конца:

Требуемый рабочий процесс

   from __future__ import absolute_import
   from celery import subtask, signals
   from pymemcache.client import base
   from test_celery.celery import app
   import time

   def get_task_uuid(task):
      return str(hash(frozenset(task[0], task[1]))))

   @app.task
   def add(x, y):
       print('add({},{}) = {} | {}'.format(x, y, x+y, time.time()))

       return x+y

    @app.task
    def sub(x, y):
       print('sub({},{}) = {} | {}'.format(x, y, x-y, time.time()))
       return x-y

    @app.task
    def mul(x, y):
       time.sleep(10)
       print('mul({},{}) = {} | {}'.format(x,y,x*y, time.time()))
       return x*y

     @signals.before_task_publish.connect
     def before_task_publish(body, exchange, routing_key, headers, properties,  retry_policy, **kw):
         task = (body, headers['task'])
         uuid = get_task_uuid(task)

Я искал любой возможный подход, пытаясь прослушать сигналы задачи, чтобы заставить D работать, как только задача A будет выполнена успешно (signal.task_success). Есть идеи?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...