задача получила неожиданный аргумент "dag" в потоке воздуха - PullRequest
0 голосов
/ 10 мая 2019

Я создал задачу с PythonOperator в качестве оператора.Он вызывает функцию в другой папке с аргументом.Но оператор не принимает аргумент dag=dag, когда на самом деле он является обязательным, поскольку он используется для указания на контекст dag.

dags/
- my_dag.py
  sub_folder/
  - __init__.py
  - my_functions.py

Мой DAG содержит task1 и task2.Они будут вызывать функцию из подпапки и передавать аргумент для печати. ​​

my_dag.py

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

from sub_folder.my_functions import task1, task2

args = {
    'owner': 'hello',
    'start_date': dt.datetime(2019, 1, 1),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2)
}

dag = DAG(
    'try',
    default_args = args,
    schedule_interval = dt.timedelta(minutes=2))

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    provide_context=True,
    op_kwargs={'idx': "World!"},
    dag=dag,
)

task1 >> task2

Вызываемые функции - это просто простые функции, которые печатают передаваемый в них аргумент.

my_functions.py

def task1(idx):
    print(f"Task 1! {idx}")

def task2(idx):
    print(f"Task 2! {idx}")

Моя задача 1 всегда повторяется, и через некоторое время происходит сбой.Я заглянул в журналы, чтобы узнать, что происходит.Я обнаружил, что это получает

TypeError: task1() got an unexpected keyword argument 'dag'

Я не знаю, что здесь происходит.Очевидно, я должен вызвать dag = dag, и это действительно аргумент для того, чтобы заставить оператор указать, с каким контейнером dag он должен иметь контекст.

1 Ответ

2 голосов
/ 10 мая 2019

Существует конфликт между my_functions.task1 и PythonOperator с именем task1

попробовать:

import sub_folder.my_functions as mf  # changed

task1 = PythonOperator(
    task_id='task1',
    python_callable=mf.task1,  # changed
    provide_context=True,
    op_kwargs={'idx': "Hello "},
    dag=dag,
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...