Интервал расписания воздушного потока с ExternalTaskSensor - PullRequest
0 голосов
/ 28 августа 2018

Я пытаюсь настроить поток воздуха с двумя интервалами с интервалом «каждые 30 минут».

Первый знак ( выдержка ) выглядит так:

extract_table_user_schema >> extract_table_user_data >> finish_extract_table_user

extract dag

Второй знак ( transform ) выглядит так:

sensor_wait_for_finish_extract_table_user >> transform_table_user >> finish_transform_table_user

transform dag

Для обоих дагов я установил schedule_interval в "*/30 * * * *"

ExternalTaskSensor настроен на execution_delta=timedelta(minutes=30)

Мой ожидаемый поток задач будет следующим: сначала запускается extract dag. Затем, после того, как фиктивная задача finish_tranform_table_user выполнена успешно, датчик срабатывает и запускаются задачи в transform .

В первый раз все работает нормально, но когда я жду второго запуска, задачи сами прерываются. Я наблюдал, как extract dag прерывается задачей transform_table_user, которая изменяет имена некоторых столбцов. Это приводит к ошибке extract_table_user_data, поскольку имена столбцов больше не совпадают.

Редактировать: , чтобы быть более точным, каковы ожидаемые и встречающиеся результаты

Во-первых, краткое изложение того, что каждая задача должна делать:

(sql) extract_table_user_schema: копирует схему из пользовательской таблицы

(sql) extract_table_user_data: копирует данные из пользователя исходной таблицы в копию этой таблицы

(пустышка) finish_extract_table_user: пустышка

(датчик) sensor_wait_for_finish_extract_table_user: датчик для ожидания завершения извлечения

(sql) transform_table_user: переименовывает поле столбца с id на user_id

(пустышка) finish_transform_table_user: пустышка

Теперь, при первом запуске dag, результатом является скопированная схема + данные и переименованное поле. Все задачи как успешные Во второй раз, когда запускаются пакеты, одна задача не выполняется, а другая не запускается. Другие все успешны. Задача extract_table_user_data завершается неудачно с сообщением об ошибке; нет такого поля как id. Задание finish_extract_table_user не было выполнено

Полученная таблица представляла собой скопированную схему, но без данных и переименованное поле.

Итак, я предполагаю, что задача transform_table_user была запущена сразу после копирования схемы. Может быть, набор execution_delta=timedelta(minutes=30) как-то не так? Может я неправильно понимаю api doc для сенсора?

Помощь будет принята с благодарностью!

...