Как я могу вернуть списки из Python Operator в airflow и использовать его в качестве аргумента для последующей задачи в dags - PullRequest
0 голосов
/ 05 ноября 2018

У меня есть 3 задачи для выполнения в том же дагс. В то время как Задача1 возвращает список словарей Задача2 и Задача3 пытаются использовать один элемент словаря из результата, возвращаемого задание 1.

def get_list():
    ....
    return listOfDict

def parse_1(example_dict):
    ...

def parse_2(example_dict):
    ...

dag = DAG('dagexample', default_args=default_args)
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for data in data_list:
    sub_task1 = PythonOperator(
        task_id='data_parse1' + data['id'],
        python_callable=parse_1,
        op_kwargs={'dataObject': data},
        dag=dag,
     )
    sub_task2 = PythonOperator(
        task_id='data_parse2' + data['id'],
        python_callable=parse_2,
        op_kwargs={'dataObject': data},
        dag=dag,
     )

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

Вы должны использовать XCom для передачи переменных / сообщений между различными задачами. Взгляните на этот пример: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

Для вашего случая это должно быть примерно так:

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True, # This is needed
}


def get_list():
    ....
    return listOfDict

def parse_1(**kwargs):
    ti = kwargs['ti']

    # get listOfDict
    v1 = ti.xcom_pull(key=None, task_ids='get_lists')

    # You can now use this v1 dictionary as a normal python dict
    ...

def parse_2(**kwargs):
    ti = kwargs['ti']

    # get listOfDict
    v1 = ti.xcom_pull(key=None, task_ids='get_lists')
    ...

dag = DAG('dagexample', default_args=default_args)
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for data in data_list:
    sub_task1 = PythonOperator(
        task_id='data_parse1' + data['id'],
        python_callable=parse_1,
        op_kwargs={'dataObject': data},
        dag=dag,
     )
    sub_task2 = PythonOperator(
        task_id='data_parse2' + data['id'],
        python_callable=parse_2,
        op_kwargs={'dataObject': data},
        dag=dag,
     )
0 голосов
/ 06 ноября 2018

Вы можете использовать XComs , так как они предназначены для связи между задачами. Если ваш словарь очень большой, то я рекомендую хранить его в виде CSV-файла. Как правило, задачи в Airflow не обмениваются данными между ними, поэтому XComs - это способ их достижения, но они ограничены небольшими объемами данных.

...