У меня два мешка test1
и test2
в потоке apache, оба мешка имеют одинаковый start_date
, но разные schedule_intervals
, следовательно, test1
мог бы успешно сработать один раз даже до того, как test2
было запланировано.
Я пытался использовать externalTaskSensor
, который работает только тогда, когда оба графика были запланированы одновременно. Я также пытался TriggerDagRunOperator
, где test1
запускает test2
после завершения, но мое требование - использовать подход на основе опроса test2
.
dag test1 ниже:
dag1 = DAG('test1',
default_args=default_args,
dagrun_timeout = timedelta(minutes=5),
schedule_interval='*/5 * * * *',
start_date = datetime(2020, 1, 21, 17, 10),
max_active_runs=1,
)
t0 = DummyOperator(
task_id = 'start',
dag = dag1
)
t1 = BashOperator(
task_id='t1',
bash_command='echo "t1 from test1"',
dag=dag1)
t2 = BashOperator(
task_id='t2',
bash_command='echo "t2 from test1"',
retries=1,
dag=dag1)
t3 = DummyOperator(
task_id = 'end',
dag = dag1
)
t0>>t1>>t3
t0>>t2>>t3
DAG test2 ниже:
dag2 = DAG('test2',
start_date= datetime(2020, 1, 21, 17, 15),
schedule_interval='*/1 * * * *',
dagrun_timeout = timedelta(minutes=5),
max_active_runs=4)
t0 = DummyOperator(task_id = 'start', dag=dag2)
c1 = ExternalTaskSensor(
task_id='c1',
external_dag_id= 'test1',
external_task_id= 'end',
timeout=300,
mode='poke',
poke_interval=30,
dag = dag2 )
t1 = BashOperator(
task_id='t1',
bash_command='echo "t1 from test2"',
dag=dag2)
t2 = BashOperator(
task_id='t2',
bash_command='echo "t1 from test2"',
dag=dag2)
t0 >> c1
c1 >> t1
c1 >> t2
Мой пример использования: test2
начать выполнение на основе состояния успеха test1
. учитывая, что test1
и test2
не находятся в рабочем состоянии одновременно.
Пример: test1
завершил выполнение в 17:00, если test2
планируется в 18:00. в тот же день он должен начать выполнение только в случае успешного выполнения test1
в эту дату