Я использую Python 3.6.6 и Сельдерей 4.2.0.
Я пытаюсь управлять рабочими процессами динамических задач , которые могут измениться на лету. Рабочие процессы могут содержать длинные и короткие шаги.
Например:
Изначально у меня есть следующий рабочий процесс:
Начальный рабочий процесс
Но в какой-то момент Я должен добавить еще одну задачу, которая зависит от A . Так что задача может ждать до конца:
Требуемый рабочий процесс
from __future__ import absolute_import
from celery import subtask, signals
from pymemcache.client import base
from test_celery.celery import app
import time
def get_task_uuid(task):
return str(hash(frozenset(task[0], task[1]))))
@app.task
def add(x, y):
print('add({},{}) = {} | {}'.format(x, y, x+y, time.time()))
return x+y
@app.task
def sub(x, y):
print('sub({},{}) = {} | {}'.format(x, y, x-y, time.time()))
return x-y
@app.task
def mul(x, y):
time.sleep(10)
print('mul({},{}) = {} | {}'.format(x,y,x*y, time.time()))
return x*y
@signals.before_task_publish.connect
def before_task_publish(body, exchange, routing_key, headers, properties, retry_policy, **kw):
task = (body, headers['task'])
uuid = get_task_uuid(task)
Я искал любой возможный подход, пытаясь прослушать сигналы задачи, чтобы заставить D работать, как только задача A будет выполнена успешно (signal.task_success). Есть идеи?