Привет. Я пытаюсь обработать несколько файлов, используя apache airflow.Я пробовал разные варианты, но в итоге использовал триггердагрунооператор.Таким образом, в основном у меня есть 2 dag, один запланированный dag для проверки файла, и он запускает триггер dag, если файл найден.но я хотел бы повторить это для многих файлов.Проверяйте один файл за раз, если файл существует, добавьте параметры и вызовите триггер с ним.
def conditionally_trigger(context, dag_run_obj):
task_id = context['params']['task_id']
task_instance = context['task_instance']
file_type = task_instance.xcom_pull(task_id, key='file_type')
if file_type is not None and file_type != "":
dag_run_obj.payload = {'file_type': file_type, 'file_name': file_name, 'file_path': full_path}
return dag_run_obj
return None
trigger_dag_run_task = TriggerDagRunOperator(
task_id='trigger_dag_run_task',
trigger_dag_id="trigger_dag",
python_callable=conditionally_trigger,
params={'task_id': check_if_file_exists_task_id},
dag=dag,
)
def execute_check_if_file_exists_task(*args, **kwargs):
input_file_list = ["a","b"]
for item in input_file_list:
full_path = json_data[item]['input_folder_path']
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_name, files):
continue
else:
# true
kwargs['ti'].xcom_push(key='file_type', value=item)
return "trigger_dag_run_task"
#false
return "file_not_found_task"
def execute_file_not_found_task(*args, **kwargs):
logging.info("File Not found path.")
file_not_found_task = PythonOperator(
task_id='file_not_found_task',
retries=3,
provide_context=True,
dag=dag,
python_callable=execute_file_not_found_task,
op_args=[])
check_if_file_exists_task = BranchPythonOperator(
task_id='check_if_file_exists_task',
retries=3,
provide_context=True,
dag=dag,
python_callable=execute_check_if_file_exists_task,
op_args=[])
check_if_file_exists_task.set_downstream(trigger_dag_run_task)
check_if_file_exists_task.set_downstream(file_not_found_task)