Почему задачи остаются в состоянии «Нет» в Airflow 1.10.2 после trigger_dag - PullRequest
1 голос
/ 11 июля 2019

У меня есть фиктивная группа обеспечения доступности баз данных, которую я хочу начать эпизодически, установив для start_date значение today и присвоив ей интервал планирования равный daily

, вот код DAG:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- airflow: DAG -*-
import logging

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

logger = logging.getLogger("DummyDAG")


def execute_python_function():
    logging.info("HEY YO !!!")
    return True


dag = DAG(dag_id='dummy_dag',
          start_date=datetime.today())

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

py_operator = PythonOperator(task_id='exec_function',
                             python_callable=execute_python_function,
                             dag=dag)

start >> py_operator >> end

В Airflow 1.9.0, когда я выполняю airflow trigger_dag -e 20190701, создается прогон DAG, создаются экземпляры задач, планируются и выполняются.

Однако в Airflow 1.10.2 для прогона DAG также создаются экземпляры задач, но они застряли в состоянии None.

для обеих версий зависимость_on_past равна False

Ниже приведены подробные сведения о задаче start в Airflow 1.9.0 (она выполняется с успехом через некоторое время)

Task Instance Details

Dependencies Blocking Task From Getting Scheduled

Dependency:         Reason
Dagrun Running:     Task instance's dagrun was not in the 'running' state but in the state 'success'.
Task Instance State:    Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Execution Date:     The execution date is 2019-07-10T00:00:00 but this is before the task's start date 2019-07-11T08:45:18.230876.
Execution Date:     The execution date is 2019-07-10T00:00:00 but this is before the task's DAG's start date 2019-07-11T08:45:18.230876.

Task instance attribute

Attribute   Value
dag_id  dummy_dag
duration    None
end_date    2019-07-10 16:32:10.372976
execution_date  2019-07-10 00:00:00
generate_command    <function generate_command at 0x7fc9fcc85b90>
hostname    airflow-worker-5dc5b999b6-2l5cp
is_premature    False
job_id  None
key     ('dummy_dag', 'start', datetime.datetime(2019, 7, 10, 0, 0))
log     <logging.Logger object at 0x7fca014e7f10>
log_filepath    /home/airflow/gcs/logs/dummy_dag/start/2019-07-10T00:00:00.log
log_url     https://i39907f7014685e91-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-10T00:00:00
logger  <logging.Logger object at 0x7fca014e7f10>
mark_success_url    https://i39907f7014685e91-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-10T00:00:00&upstream=false&downstream=false
max_tries   0
metadata    MetaData(bind=None)
next_try_number     2
operator    None
pid     180712
pool    None
previous_ti     None
priority_weight     3
queue   default
queued_dttm     None
run_as_user     None
start_date  2019-07-10 16:32:08.483531
state   success
task    <Task(DummyOperator): start>
task_id     start
test_mode   False
try_number  2
unixname    airflow

Task Attributes

Attribute   Value
adhoc   False
dag     <DAG: dummy_dag>
dag_id  dummy_dag
depends_on_past     False
deps    set([<TIDep(Not In Retry Period)>, <TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>])
downstream_list     [<Task(PythonOperator): exec_function>]
downstream_task_ids     ['exec_function']
email   None
email_on_failure    True
email_on_retry  True
end_date    None
execution_timeout   None
log     <logging.Logger object at 0x7fc9e2085350>
logger  <logging.Logger object at 0x7fc9e2085350>
max_retry_delay     None
on_failure_callback     None
on_retry_callback   None
on_success_callback     None
owner   Airflow
params  {}
pool    None
priority_weight     1
priority_weight_total   3
queue   default
resources   {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries     0
retry_delay     0:05:00
retry_exponential_backoff   False
run_as_user     None
schedule_interval   1 day, 0:00:00
sla     None
start_date  2019-07-11 08:45:18.230876
task_concurrency    None
task_id     start
task_type   DummyOperator
template_ext    []
template_fields     ()
trigger_rule    all_success
ui_color    #e8f7e4
ui_fgcolor  #000
upstream_list   []
upstream_task_ids   []
wait_for_downstream     False

Сведения о задаче запуска в Airflow 1.10.2

Task Instance Details
Dependencies Blocking Task From Getting Scheduled

Dependency  Reason
Execution Date  The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's start date 2019-07-11T08:53:32.593360+00:00.
Execution Date  The execution date is 2019-07-11T00:00:00+00:00 but this is before the task's DAG's start date 2019-07-11T08:53:32.593360+00:00.


Task Instance Attributes

Attribute   Value
dag_id  dummy_dag
duration    None
end_date    None
execution_date  2019-07-11T00:00:00+00:00
executor_config     {}
generate_command    <function generate_command at 0x7f4621301578>
hostname    
is_premature    False
job_id  None
key     ('dummy_dag', 'start', <Pendulum [2019-07-11T00:00:00+00:00]>, 1)
log     <logging.Logger object at 0x7f4624883350>
log_filepath    /home/airflow/gcs/logs/dummy_dag/start/2019-07-11T00:00:00+00:00.log
log_url     https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/log?dag_id=dummy_dag&task_id=start&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00
logger  <logging.Logger object at 0x7f4624883350>
mark_success_url    https://a15d189066a5c65ee-tp.appspot.com/admin/airflow/success?task_id=start&dag_id=dummy_dag&execution_date=2019-07-11T00%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries   0
metadata    MetaData(bind=None)
next_try_number     1
operator    None
pid     None
pool    None
previous_ti     None
priority_weight     3
queue   default
queued_dttm     None
raw     False
run_as_user     None
start_date  None
state   None
task    <Task(DummyOperator): start>
task_id     start
test_mode   False
try_number  1
unixname    airflow


Task Attributes


Attribute   Value
adhoc   False
dag     <DAG: dummy_dag>
dag_id  dummy_dag
depends_on_past     False
deps    set([<TIDep(Previous Dagrun State)>, <TIDep(Trigger Rule)>, <TIDep(Not In Retry Period)>])
downstream_list     [<Task(PythonOperator): exec_function>]
downstream_task_ids     set(['exec_function'])
email   None
email_on_failure    True
email_on_retry  True
end_date    None
execution_timeout   None
executor_config     {}
inlets  []
lineage_data    None
log     <logging.Logger object at 0x7f460b467e10>
logger  <logging.Logger object at 0x7f460b467e10>
max_retry_delay     None
on_failure_callback     None
on_retry_callback   None
on_success_callback     None
outlets     []
owner   Airflow
params  {}
pool    None
priority_weight     1
priority_weight_total   3
queue   default
resources   {'disk': {'_qty': 512, '_units_str': 'MB', '_name': 'Disk'}, 'gpus': {'_qty': 0, '_units_str': 'gpu(s)', '_name': 'GPU'}, 'ram': {'_qty': 512, '_units_str': 'MB', '_name': 'RAM'}, 'cpus': {'_qty': 1, '_units_str': 'core(s)', '_name': 'CPU'}}
retries     0
retry_delay     0:05:00
retry_exponential_backoff   False
run_as_user     None
schedule_interval   1 day, 0:00:00
sla     None
start_date  2019-07-11T08:53:32.593360+00:00
task_concurrency    None
task_id     start
task_type   DummyOperator
template_ext    []
template_fields     ()
trigger_rule    all_success
ui_color    #e8f7e4
ui_fgcolor  #000
upstream_list   []
upstream_task_ids   set([])
wait_for_downstream     False
weight_rule     downstream

1 Ответ

1 голос
/ 11 июля 2019

IMO это не проблема версии. Если вы проверите журналы, вы увидите сообщения вроде:

Дата исполнения:
Дата выполнения: 2019-07-10T00: 00: 00, но это до даты начала задания 2019-07-11T08: 45: 18.230876.

Дата выполнения - это та, которую вы вводите в команду trigger_dag, тогда как дата начала вашей группы DAG меняется, поскольку Python datetime.today() возвращает текущее время. Чтобы увидеть это, вы можете сделать:

airflow@e3bc9a0a7a3e:~$ airflow trigger_dag dummy_dag -e 20190702

А позже перейдите на http://localhost:8080/admin/airflow/task?dag_id=dummy_dag&task_id=start&execution_date=2019-07-02T00%3A00%3A00%2B00%3A00 (или любой соответствующий URL) и обновите страницу. Вы должны увидеть, как Dependency > Execution date меняется каждый раз.

В вашем случае это будет проблематично, поскольку вы пытаетесь вызвать DAG из прошлого. Лучший способ - указать статическую дату или использовать любой из утилит Airflow, чтобы выяснить это:

dag = DAG(dag_id='dummy_dag',
          start_date=datetime(2019, 7, 11, 0, 0))

В противном случае, если вы хотите обработать исторические данные, вы можете использовать airflow backfill


обновление

Запуск DAG по требованию

После уточнения комментариев мы нашли другой способ вызвать DAG по требованию со свойством schedule_interval=None.

...