создавать задачи в Airflow, перебирая список и передавая аргументы - PullRequest
0 голосов
/ 14 февраля 2019

edit: Это будет работать, я определил ex_func_airflow (var_1 = i), который вызывал проблему

Я хотел бы создавать задачи в потоке воздуха, выполняя зацикливание в списке.

tabs = [1,2,3,4,5]
for i in tabs:
    task = PythonOperator(
    task_id = name,
    provide_context=False,
    op_args  = [i],
    python_callable=ex_func_airflow,
    dag=dag)
    task_0 >> task >> task_1

Когда это выполняется в воздушном потоке, передаваемый аргумент всегда является последним элементом в этом списке.

Итак, я по сути работаю:

ex_func_airflow(6) 

пятьраз вместо выполнения

ex_func_airflow(1)
ex_func_airflow(2)
ex_func_airflow(3)

.. и т. д.

Как я могу передать правильные аргументы для каждой задачи?

1 Ответ

0 голосов
/ 14 февраля 2019

Следующие коды работают для меня.

def print_context(ds, **kwargs):
    print("hello")


def ex_func_airflow(i):
    print(i)


dag = DAG(
    dag_id="loop_dag",
    schedule_interval=None,
    start_date=datetime(2018, 12, 31),
)

task_0 = PythonOperator(
    task_id='task_0',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

task_1 = PythonOperator(
    task_id='task_1',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

tabs = [1, 2, 3, 4, 5]
for i in tabs:
    task_id = f'task_tab_{i}'
    task = PythonOperator(
        task_id=task_id,
        provide_context=False,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag)
    task_0 >> task >> task_1
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...