У меня есть функция, которую мы в основном используем для добавления в конец каждого DAG. Давайте назовем это finalize , и это выглядит как следующий Common класс (конечно, в действительности он делает полезные вещи). И то, что мы обычно делаем, это почти для каждого DAG, до конца мы добавляем код вроде
task_1 >> task_2 >> ... task_n >> common.finalize
и в результате получаем
task_1 >> task_2 >> ... task_n >> a >> b >> c .
Пока все хорошо. Однако теперь для одного из DAG я хочу добавить задачу после finalize . Я не хочу касаться функции финализации и кода
task_1 >> task_2 >> ... task_n >> common.finalize >> task_new
мне не помогает, потому что task_new начинается сразу после a . Но я хочу, чтобы task_new был выполнен после c .
Какие-либо предложения?
Заранее спасибо.
class Common(object):
def __init__(self, dag):
self.dag = dag
@property
def finalize(self):
a = BashOperator(
task_id='echo_dag',
bash_command='echo "dag"',
dag=self.dag)
b = BashOperator(
task_id='echo_has_completed',
bash_command='echo "has completed"',
dag=self.dag)
c = BashOperator(
task_id='echo_successfully',
bash_command='echo "successfully"',
dag=self.dag)
a >> b >> c
return a