Я использую airflow для организации процесса etl и построил следующую задачу:
for sql_file in sql_files:
t1 = PythonOperator(
task_id= sql_file + '_run',
provide_context=True,
python_callable=run_sql_from_file,
op_kwargs={'filename': sql_file + '.sql',
'connection': connection,
'logger': logger
},
trigger_rule='all_done',
dag=dag)
Все файлы sql имеют одинаковую структуру и являются вариациями этого сценария:
delete from databaseY.tableX;
insert into databaseY.tableX
select *
from databaseZ.tableW as bs
inner join databaseW.tableY as cd on bs.id_camp = cd.id_camp
Сначала удаляются записи таблицы данных, а затем вставляются новые записи.
Когда я запускаю это задание из воздушного потока, я не получаю никаких ошибок, фактически Airflow говорит мне, что все задания были выполнены успешно.К сожалению, когда dag закончен, некоторые таблицы данных пусты.Я думаю, что это связано с тем, что задача удаляет данные из таблицы данных, но никогда не заканчивает вставку новых данных.Я думаю, что поток воздуха истекает, но я не знаю, где я могу убедиться, что этого не произойдет.
Вопрос: Как мне решить эту проблему?