Я создал задачу с PythonOperator в качестве оператора.Он вызывает функцию в другой папке с аргументом.Но оператор не принимает аргумент dag=dag
, когда на самом деле он является обязательным, поскольку он используется для указания на контекст dag.
dags/
- my_dag.py
sub_folder/
- __init__.py
- my_functions.py
Мой DAG содержит task1 и task2.Они будут вызывать функцию из подпапки и передавать аргумент для печати.
my_dag.py
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from sub_folder.my_functions import task1, task2
args = {
'owner': 'hello',
'start_date': dt.datetime(2019, 1, 1),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2)
}
dag = DAG(
'try',
default_args = args,
schedule_interval = dt.timedelta(minutes=2))
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
provide_context=True,
op_kwargs={'idx': "Hello "},
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
provide_context=True,
op_kwargs={'idx': "World!"},
dag=dag,
)
task1 >> task2
Вызываемые функции - это просто простые функции, которые печатают передаваемый в них аргумент.
my_functions.py
def task1(idx):
print(f"Task 1! {idx}")
def task2(idx):
print(f"Task 2! {idx}")
Моя задача 1 всегда повторяется, и через некоторое время происходит сбой.Я заглянул в журналы, чтобы узнать, что происходит.Я обнаружил, что это получает
TypeError: task1() got an unexpected keyword argument 'dag'
Я не знаю, что здесь происходит.Очевидно, я должен вызвать dag = dag, и это действительно аргумент для того, чтобы заставить оператор указать, с каким контейнером dag он должен иметь контекст.