Поток воздуха долго работает ежечасно DAG несколько часов - PullRequest
0 голосов
/ 12 марта 2019

Мой DAG планируется запускать каждый час. Я каждый час беру данные из источника s3 и обрабатываю их. Иногда на выполнение задачи уходит больше часа. В то время мне не хватает часа данных.

Пример: 13:00 DAG запускался и работал в течение 2 часов. Так что мой следующий прогон DAG принимает параметр 3 (3 часа дня), пропуская данные 2 часа дня. Другими словами, как мне вызвать задачу и убедиться, что она выполняется каждый час, т.е. 24 раза в день

Ответы [ 2 ]

0 голосов
/ 13 марта 2019

Вот мой DAG

HOUR_PACIFIC = arrow.utcnow().shift(hours=-3).to('US/Pacific').format("HH")

dag = DAG(
    DAG_ID,
    catchup=False,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=5),
    schedule_interval='0 * * * *')

start = DummyOperator(
    task_id='Start',
    dag=dag)

my_task = EMRStep(emr,
'stg',
HOUR_PACIFIC)

end = DummyOperator(
    task_id='End',
    dag=dag
)
start >> my_task >> end
0 голосов
/ 12 марта 2019

Это идеальный сценарий для использования TimeDeltaSensor


Примечание: следующий фрагмент кода приведен только для справки и НЕ был проверен

import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.utils.trigger_rule import TriggerRule

# create DAG object
my_dag: DAG = DAG(dag_id="my_dag",
                  start_date=datetime.datetime(year=2019, month=3, day=11),
                  schedule_interval="0 0 0 * * *")

# create dummy begin & end tasks
my_begin_task: DummyOperator = DummyOperator(dag=my_dag,
                                             task_id="my_begin_task")
my_end_task: DummyOperator = DummyOperator(dag=my_dag,
                                           task_id="my_end_task",
                                           trigger_rule=TriggerRule.ALL_DONE)

# populate the DAG
for i in range(1, 24, 1):
    # create sensors and actual tasks for all hours of the day
    my_time_delta_sensor: TimeDeltaSensor = TimeDeltaSensor(dag=my_dag,
                                                            task_id=f"my_time_delta_sensor_task_{i}_hours",
                                                            delta=datetime.timedelta(hours=i))
    my_actual_task: PythonOperator = PythonOperator(dag=my_dag,
                                                    task_id=f"my_actual_task_{i}_hours",
                                                    python_callable=my_callable
                                                    ..)
    # wire-up tasks together
    my_begin_task >> my_time_delta_sensor >> my_actual_task >> my_end_task

Ссылки

...