Вызов BashOperator из Python, вызываемого с помощью XCom Pull in Airflow Dag - PullRequest
0 голосов
/ 05 февраля 2020

Я пытаюсь вернуть список значений из задачи воздушного потока, а затем в другой задаче l oop поверх списка и вызвать BashOperator, используя значение в качестве аргумента сценария python.

Если использование PythonOperator является правильным способом, я бы хотел узнать, как это сделать, но я хочу, чтобы скрипт Python, который я называю, был внешним файлом, а не другим вызываемым в моем Airflow Dag.

Я могу l oop по списку и распечатать значение, однако я не могу понять, как вызвать BashOperator, который я поместил в l oop.

Вот простая версия кода, просто пытающаяся отобразить значение:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime


DAG = DAG(
    dag_id='locations_test',
    start_date=datetime.now(),
    schedule_interval='@once'
)


def get_locations_function(**kwargs):

    locations = ['one','two','three']

    return locations


get_locations_task = PythonOperator(
    task_id='get_locations_task',
    python_callable=get_locations_function,
    provide_context=True,
    dag=DAG)


def call_loop_over_locations_function(**kwargs):
    ti = kwargs['ti']
    locations = ti.xcom_pull(task_ids='get_locations_task')
    for loc in locations:
        print(loc) #this prints
        bash_operator = BashOperator(
            task_id='do_things_with_location',
            bash_command="echo '%s'" %loc,
            xcom_push=True,
            dag=DAG)

    #this doesn't get called, i have also tried 
    #call_loop_over_locations_task >> bash_operator

    bash_operator 


call_loop_over_locations_task = PythonOperator(
    task_id='call_loop_over_locations_task',
    python_callable=call_loop_over_locations_function,
    provide_context=True,
    dag=DAG)


get_locations_task >> call_loop_over_locations_task

1 Ответ

0 голосов
/ 06 февраля 2020

**kwargs в вашем call_loop_over_locations_function() на самом деле является контекстом. Вы можете использовать его для выполнения bash оператора.

код

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime


DAG = DAG(
    dag_id='locations_test',
    start_date=datetime(2020, 1, 1),
    schedule_interval=None
)


def get_locations_function(**kwargs):

    locations = ['one','two','three']

    return locations


get_locations_task = PythonOperator(
    task_id='get_locations_task',
    python_callable=get_locations_function,
    provide_context=True,
    dag=DAG)


def call_loop_over_locations_function(**kwargs):
    print(kwargs)
    ti = kwargs.get('ti')
    locations = ti.xcom_pull(task_ids='get_locations_task')
    for loc in locations:
        print(loc) #this prints
        bash_operator = BashOperator(
            task_id='do_things_with_location',
            bash_command="echo '%s'" %loc,
            xcom_push=True,
            dag=DAG)
        bash_operator.execute(context=kwargs)

    #this doesn't get called, i have also tried 
    #call_loop_over_locations_task >> bash_operator

    bash_operator 


call_loop_over_locations_task = PythonOperator(
    task_id='call_loop_over_locations_task',
    python_callable=call_loop_over_locations_function,
    provide_context=True,
    dag=DAG)


get_locations_task >> call_loop_over_locations_task
...