Я хочу выполнить цепочку задач после того, как начальные задачи создадут аргументы:
load_arguments () -> группа (process_1 (arg) | process_2 (arg ') для каждого аргумента из первой функции)
Уже есть вопрос по этому поводу, но я не могу его прокомментировать: Цепочка результатов задания сельдерея в распределенную группу
Я воссоздал оба ответа, а оба - нетработа.
@app.task
def dmap(it, callback):
callback = subtask(callback)
return group(subtask(copy.deepcopy(dict(callback))).clone([arg,]) for arg in it)()
@app.task
def irange(n=10):
return list(range(n))
@app.task
def addone(x):
print(f"ADDONE: {x+1}")
return x+1
app.conf.beat_schedule = {
'chained_schedule': {
'task': 'tasks.irange',
'schedule': crontab(),
'options': {
'link': dmap.s(addone.s() | addone.s()),
},
'args': (100,),
}
}
Как добавить цепочку задач в dmap?