Я пытаюсь вернуть список значений из задачи воздушного потока, а затем в другой задаче l oop поверх списка и вызвать BashOperator, используя значение в качестве аргумента сценария python.
Если использование PythonOperator является правильным способом, я бы хотел узнать, как это сделать, но я хочу, чтобы скрипт Python, который я называю, был внешним файлом, а не другим вызываемым в моем Airflow Dag.
Я могу l oop по списку и распечатать значение, однако я не могу понять, как вызвать BashOperator, который я поместил в l oop.
Вот простая версия кода, просто пытающаяся отобразить значение:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
DAG = DAG(
dag_id='locations_test',
start_date=datetime.now(),
schedule_interval='@once'
)
def get_locations_function(**kwargs):
locations = ['one','two','three']
return locations
get_locations_task = PythonOperator(
task_id='get_locations_task',
python_callable=get_locations_function,
provide_context=True,
dag=DAG)
def call_loop_over_locations_function(**kwargs):
ti = kwargs['ti']
locations = ti.xcom_pull(task_ids='get_locations_task')
for loc in locations:
print(loc) #this prints
bash_operator = BashOperator(
task_id='do_things_with_location',
bash_command="echo '%s'" %loc,
xcom_push=True,
dag=DAG)
#this doesn't get called, i have also tried
#call_loop_over_locations_task >> bash_operator
bash_operator
call_loop_over_locations_task = PythonOperator(
task_id='call_loop_over_locations_task',
python_callable=call_loop_over_locations_function,
provide_context=True,
dag=DAG)
get_locations_task >> call_loop_over_locations_task