Я пытаюсь настроить поток воздуха с двумя интервалами с интервалом «каждые 30 минут».
Первый знак ( выдержка ) выглядит так:
extract_table_user_schema >> extract_table_user_data >> finish_extract_table_user
Второй знак ( transform ) выглядит так:
sensor_wait_for_finish_extract_table_user >> transform_table_user >> finish_transform_table_user
Для обоих дагов я установил schedule_interval в "*/30 * * * *"
ExternalTaskSensor настроен на execution_delta=timedelta(minutes=30)
Мой ожидаемый поток задач будет следующим: сначала запускается extract dag. Затем, после того, как фиктивная задача finish_tranform_table_user
выполнена успешно, датчик срабатывает и запускаются задачи в transform .
В первый раз все работает нормально, но когда я жду второго запуска, задачи сами прерываются.
Я наблюдал, как extract dag прерывается задачей transform_table_user
, которая изменяет имена некоторых столбцов. Это приводит к ошибке extract_table_user_data
, поскольку имена столбцов больше не совпадают.
Редактировать: , чтобы быть более точным, каковы ожидаемые и встречающиеся результаты
Во-первых, краткое изложение того, что каждая задача должна делать:
(sql) extract_table_user_schema
: копирует схему из пользовательской таблицы
(sql) extract_table_user_data
: копирует данные из пользователя исходной таблицы в копию этой таблицы
(пустышка) finish_extract_table_user
: пустышка
(датчик) sensor_wait_for_finish_extract_table_user
: датчик для ожидания завершения извлечения
(sql) transform_table_user
: переименовывает поле столбца с id
на user_id
(пустышка) finish_transform_table_user
: пустышка
Теперь, при первом запуске dag, результатом является скопированная схема + данные и переименованное поле. Все задачи как успешные
Во второй раз, когда запускаются пакеты, одна задача не выполняется, а другая не запускается. Другие все успешны.
Задача extract_table_user_data
завершается неудачно с сообщением об ошибке; нет такого поля как id
.
Задание finish_extract_table_user
не было выполнено
Полученная таблица представляла собой скопированную схему, но без данных и переименованное поле.
Итак, я предполагаю, что задача transform_table_user
была запущена сразу после копирования схемы. Может быть, набор execution_delta=timedelta(minutes=30)
как-то не так? Может я неправильно понимаю api doc для сенсора?
Помощь будет принята с благодарностью!