Задание в том же DAG воздушного потока запускается до того, как будет выполнено предыдущее задание - PullRequest
0 голосов
/ 04 февраля 2020

У нас есть DAG, которая в качестве первой задачи объединяет таблицу (A) в промежуточную таблицу (B). После этого есть задача, которая читает из промежуточной таблицы (B) и записывает в другую таблицу (C).

Однако вторая задача выполняет чтение из агрегированной таблицы (B) до того, как она была выполнена. полностью обновляется, в результате чего таблица C содержит старые данные или иногда она пуста. Airflow по-прежнему регистрирует все как успешное.

Updating table B is done as (pseudo):
delete all rows;
insert into table b
select xxxx from table A;
Task Concurrency is set as 10 
pool size: 5
max_overflow: 10
Using local executor 

Redshift, похоже, имеет очередь фиксации. Может ли быть так, что красное смещение сообщает воздушному потоку, которое оно зафиксировало, когда фиксация фактически все еще находится в очереди, и следующая задача, таким образом, считывает, прежде чем произойдет реальная фиксация?

Мы попытались обернуть обновление таблицы B в транзакция как (псевдо):

begin
delete all rows;
insert into table b
select xxxx from table A;
commit;

Но даже это не работает. По какой-то причине airflow управляет запуском второй задачи до того, как первая задача не будет полностью завершена.

ОБНОВЛЕНИЕ

Оказалось, что в зависимостях произошла ошибка. Нижестоящие задачи ожидали завершения некорректной задачи sh.

. Для дальнейшего использования никогда не будьте на 100% уверены, что вы все проверили. Проверьте и перепроверьте весь поток.

1 Ответ

0 голосов
/ 04 февраля 2020

Вы можете достичь этой цели, установив wait_for_downstream в True.

С https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html:

, когда установлено значение true, Экземпляр задачи X будет ожидать успешного завершения задач сразу после предыдущего экземпляра задачи X перед его выполнением.

Этот параметр можно установить на уровне default_dag_args или на уровне задач (операторов) .

default_dag_args = {
   'wait_for_downstream': True,
}
...