Я пытаюсь запускать задачи независимо и параллельно,
Я выгляжу как-то так:
---> patternA ---> file1a
---> file2a
---> file3a
sensor ---> move_csv ---> result_mv ---> rerun_dag
---> patternB ---> file1b
---> file2b
---> file3b
my dag.py:
sensor = FileSensor(
task_id="sensor ",
filepath=filePath,
fs_conn_id='airflow_db',
poke_interval=10,
dag=dag,
)
move_csv = BranchPythonOperator(
task_id='move_csv',
python_callable=moveCsvFile,
trigger_rule='none_failed',
dag=dag,
)
result_mv = BranchPythonOperator(
task_id='result_mv',
python_callable=result,
trigger_rule='none_failed',
dag=dag,
)
pattern_A = DummyOperator(
task_id="pattern_A ",
dag=dag,
)
pattern_B = DummyOperator(
task_id="pattern_B ",
dag=dag,
)
file1 = BashOperator(
task_id="file1a ",
bash_command='python3 '+scriptPath+'file1.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file2 = BashOperator(
task_id="file2a",
bash_command='python3 '+scriptPath+'file2.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file3 = BashOperator(
task_id="file3a",
bash_command='python3 '+scriptPath+'file3.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file1 = BashOperator(
task_id="file1b ",
bash_command='python3 '+scriptPath+'file1b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file2 = BashOperator(
task_id="file2b",
bash_command='python3 '+scriptPath+'file2b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file3 = BashOperator(
task_id="file3b",
bash_command='python3 '+scriptPath+'file3b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
move_csv.set_upstream(sensor)
result_mv.set_upstream(move_csv)
patternA.set_upstream(result_mv)
patternB.set_upstream(result_mv)
file1a.set_upstream(patternA)
file2a.set_upstream(patternA)
file3a.set_upstream(patternA)
file1b.set_upstream(patternB)
file2b.set_upstream(patternB)
file3b.set_upstream(patternB)
rerun.set_uptstream( from all file ...)
каков наилучший способ в patternA пропустить file2a и file3a, если у меня есть только file1a, соответствующий шаблону? И если у меня есть соответствие file1a и file2a, я бы хотел запустить их параллельно и пропустить file3a.
Моя задача файлов выполняет вызов сценария python с BashOperator.
Спасибо для помощи ! :)