Я создал BranchPythonOperator, который вызывает 2 задачи в зависимости от условия, например:
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_create_table = PythonOperator(
task_id='typicon_create_table',
python_callable=CreateTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_load_data = PythonOperator(
task_id='typicon_load_data',
python_callable=LoadData(),
provide_context=True,
dag=typicon_task_dag)
typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)
Это CheckTable
вызываемый класс:
class CheckTable:
"""
DAG task to check if table exists or not.
"""
def __call__(self, **kwargs) -> None:
pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
query = "SELECT EXISTS ( \
SELECT 1 FROM information_schema.tables \
WHERE table_schema = 'public' \
AND table_name = 'users');"
table_exists = pg_hook.get_records(query)[0][0]
if table_exists:
return "typicon_load_data"
return "typicon_create_table"
Проблема в том, что обе задачи пропускаются при запуске задачи typicon_check_table
.
Как решить эту проблему?