Воздушный поток: задача после пропуска задачи BranchPythonOperator - PullRequest
0 голосов
/ 23 января 2019

Я создал BranchPythonOperator, который вызывает 2 задачи в зависимости от условия, например:

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_create_table = PythonOperator(
    task_id='typicon_create_table',
    python_callable=CreateTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_load_data = PythonOperator(
    task_id='typicon_load_data',
    python_callable=LoadData(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)

Это CheckTable вызываемый класс:

class CheckTable:
    """
    DAG task to check if table exists or not.
    """

    def __call__(self, **kwargs) -> None:
        pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
        query = "SELECT EXISTS ( \
            SELECT 1 FROM information_schema.tables \
            WHERE table_schema = 'public' \
            AND table_name = 'users');"

        table_exists = pg_hook.get_records(query)[0][0]
        if table_exists:
            return "typicon_load_data"
        return "typicon_create_table"

Проблема в том, что обе задачи пропускаются при запуске задачи typicon_check_table.

Как решить эту проблему?

enter image description here

Ответы [ 2 ]

0 голосов
/ 24 января 2019

Добавьте правило trigger_rule = "all_done" в typicon_check_table, как показано ниже

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    trigger_rule="all_done",
    dag=typicon_task_dag)
0 голосов
/ 24 января 2019

Задача typicon_load_data имеет typicon_create_table в качестве родителя и по умолчанию trigger_rule all_success, поэтому я не удивлен таким поведением.

Два возможных случая здесь:

  1. CheckTable() возвращает typicon_load_data, затем typicon_create_table пропускается, но typicon_load_data, находящийся ниже по потоку, также пропускается.
  2. CheckTable() возвращает typicon_create_table, которое выполнено и вызывает typicon_load_data, который пропускается, потому что это была исключенная ветвь.

Полагаю, ваш скриншот из случая 1.?

...