Воздушный поток говорит мне, что Даг успешно выполнил задание - PullRequest
0 голосов
/ 15 октября 2018

Я использую airflow для организации процесса etl и построил следующую задачу:

for sql_file in sql_files:
  t1 = PythonOperator(
    task_id= sql_file + '_run',
    provide_context=True,
    python_callable=run_sql_from_file,
    op_kwargs={'filename': sql_file + '.sql',
               'connection': connection,
               'logger': logger
               },
    trigger_rule='all_done',
    dag=dag)

Все файлы sql имеют одинаковую структуру и являются вариациями этого сценария:

    delete from databaseY.tableX;

    insert into databaseY.tableX
    select *
    from     databaseZ.tableW as bs
    inner join databaseW.tableY as cd on bs.id_camp = cd.id_camp

Сначала удаляются записи таблицы данных, а затем вставляются новые записи.

Когда я запускаю это задание из воздушного потока, я не получаю никаких ошибок, фактически Airflow говорит мне, что все задания были выполнены успешно.К сожалению, когда dag закончен, некоторые таблицы данных пусты.Я думаю, что это связано с тем, что задача удаляет данные из таблицы данных, но никогда не заканчивает вставку новых данных.Я думаю, что поток воздуха истекает, но я не знаю, где я могу убедиться, что этого не произойдет.

Вопрос: Как мне решить эту проблему?

1 Ответ

0 голосов
/ 16 октября 2018

Вы запускали эту функцию python как асинхронную или синхронизирующую или проверяли состояние работы БД в run_sql_from_file?Если вы запустите его как асинхронный, возможно, он не вернет статус.Я всегда пишу операторы для проверки состояния работы БД в такой ситуации.

...