Не удается добавить задачу в функцию, которая возвращает конвейер (Airflow, DAG) - PullRequest
0 голосов
/ 28 марта 2019

У меня есть функция, которую мы в основном используем для добавления в конец каждого DAG. Давайте назовем это finalize , и это выглядит как следующий Common класс (конечно, в действительности он делает полезные вещи). И то, что мы обычно делаем, это почти для каждого DAG, до конца мы добавляем код вроде

task_1 >> task_2 >> ... task_n >> common.finalize

и в результате получаем

task_1 >> task_2 >> ... task_n >> a >> b >> c .

Пока все хорошо. Однако теперь для одного из DAG я хочу добавить задачу после finalize . Я не хочу касаться функции финализации и кода

task_1 >> task_2 >> ... task_n >> common.finalize >> task_new

мне не помогает, потому что task_new начинается сразу после a . Но я хочу, чтобы task_new был выполнен после c . Какие-либо предложения? Заранее спасибо.

class Common(object):
    def __init__(self, dag):
        self.dag = dag

    @property
    def finalize(self):
        a = BashOperator(
            task_id='echo_dag',
            bash_command='echo "dag"',
            dag=self.dag)

        b = BashOperator(
            task_id='echo_has_completed',
            bash_command='echo "has completed"',
            dag=self.dag)

        c = BashOperator(
            task_id='echo_successfully',
            bash_command='echo "successfully"',
            dag=self.dag)

        a >> b >> c
        return a

1 Ответ

0 голосов
/ 28 марта 2019

Возможно, я что-то упускаю, но звучит так, как будто вы можете установить trigger_rule на значение *1002*, равное all_success, чтобы убедиться, что все последующие задачи были успешно завершены до его запуска, а затем организовать такие задачи, как это:

[task_1, task_2, task_n, common.finalize] >> [task_new]

...