Воздушный поток Внешний датчик застревает в тыкании - PullRequest
0 голосов
/ 13 октября 2018

Я хочу, чтобы один даг начинался после завершения другого.одно решение использует функцию внешнего датчика, ниже вы можете найти мое решение.проблема, с которой я сталкиваюсь, заключается в том, что зависимый dag застревает при поиске, я проверил этот ответ и убедился, что оба dag работают по одному и тому же расписанию, мой упрощенный код выглядит следующим образом: любая помощь приветствуется,ведущий dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2015, 6, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),



 }

 schedule = '* * * * *'

 dag = DAG('leader_dag', default_args=default_args,catchup=False, 
 schedule_interval=schedule)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=dag)

зависимый dag:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),


}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False, 
schedule_interval=schedule)

 wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', 
 external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)

 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
      dag=dag)

 t1.set_upstream(wait_for_task)

журнал для leader_dag: enter image description here

журнал для зависимого dag:

enter image description here

Ответы [ 2 ]

0 голосов
/ 22 октября 2018

Сначала task_id в leader_dag называется print_date, но вы настраиваете dependent_dag с задачей wait_for_task, которая ожидает задачи leader_dag с именем t1.Нет задачи с именем t1.То, что вы назначили в файле py, не имеет значения и не используется в БД "Воздушный поток" и поперек датчиком.Он должен ждать имени задачи print_date.

Во-вторых, ваши журналы не выстраиваются в очередь, в которой запускает leader_dag, вы показываете, что ожидает зависимый_даг.

Наконец, я не могуРекомендуем использовать Airflow для планирования задач каждую минуту.Конечно, не две зависимые задачи вместе.Рассмотрите возможность создания потоковых заданий в другой системе, такой как Spark, или используйте для этого собственную среду Celery или Dask.

Вы также можете избежать использования ExternalTaskSensor, добавив TriggerDagRunOperator в конце вашего leader_dag кзапустите зависимый_даг и удалите из него расписание, установив для schedule_interval значение None.

. В ваших журналах я вижу журнал для лидера 2018-10-13T19: 08: 11.В лучшем случае это будет dagrun для execute_date 2018-10-13 19:07:00, потому что минутный период, начинающийся с 19:07, заканчивается в 19:08, что является самым ранним из запланированных периодов.И я вижу некоторую задержку между планированием и выполнением около 11 секунд , если это так .Однако может быть несколько минут задержки планирования в Airflow.

Я также вижу журнал из dependent_dag, который работает с 19:14:04 до 19:14:34 и ищет завершениесоответствующий 19:13:00 дагрун.Нет никаких признаков того, что ваш планировщик достаточно свободен для запуска dagrun 19:13:00 из leader_dag к 19:14:34.Вы могли бы лучше убедить меня, если бы показывали, что тыкаешь 5 минут или около того.Конечно, никогда не почувствует leader_dag.t1 , потому что это не то, что вы назвали показанными задачами.

Итак, у Airflow есть задержка планирования, если у вас былонесколько тысяч дагов в системе, это может быть больше, чем 1 минута, так что при catchup=False вы получите несколько прогонов, следующих друг за другом IE 19:08, 19:09 и некоторые прогоны, которые пропускают минуту (или 6) может произойти, например, 19:10, за которым следует 19:16, и поскольку задержка является случайной на основе принципа dag-by-dag, вы можете получить трассы без выравнивания с датчиком, ожидающим вечно, даже если у вас есть правильныйИдентификатор задачи для ожидания:

 wait_for_task = ExternalTaskSensor(
     task_id='wait_for_task', 
     external_dag_id='leader_dag',
-    external_task_id='t1',
+    external_task_id='print_date',
     dag=dag)
0 голосов
/ 16 октября 2018

При использовании ExternalTaskSensor вы должны назначить обеим группам DAG одинаковую дату начала.Если это не работает для вашего варианта использования, вам нужно использовать execution_delta или execution_date_fn в вашем ExternalTaskSensor.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...