Данные файлы test1.py
, test2.py
, test3.py
как
# this is `test1.py`
def entry_point_1():
print("entry_point_1")
вы можете создать test_dag.py
.
├── __init__.py
├── test1.py
├── test2.py
├── test3.py
└── test_dag.py
Есть 2 простых подхода
1. Использование PythonOperator
# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import test1, test2, test3
dag_python: DAG = DAG(dag_id="dag_using_python_op",
start_date=datetime(year=2019, month=1, day=14),
schedule_interval=None)
python_op_1: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_1",
python_callable=test1.entry_point_1)
python_op_2: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_2",
python_callable=test2.entry_point_2)
python_op_3: PythonOperator = PythonOperator(dag=dag_python,
task_id="python_op_3",
python_callable=test3.entry_point_3)
python_op_1 >> python_op_2 >> python_op_3
2. Использование BashOperator
# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag_bash: DAG = DAG(dag_id="dag_using_bash_op",
start_date=datetime(year=2019, month=1, day=14),
schedule_interval=None)
bash_op_1: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_1",
bash_command="python -c 'import test1; test1.entry_point_1()")
bash_op_2: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_2",
bash_command="python -c 'import test2; test2.entry_point_2()'")
bash_op_3: BashOperator = BashOperator(dag=dag_bash,
task_id="bash_op_3",
bash_command="python -c 'import test3; test3.entry_point_3()'")
bash_op_1 >> bash_op_2 >> bash_op_3
Примечание: вам нужно исправить PYTHONPATH
, чтобы оно заработало; Я не смог этого сделать, но попробую (и сообщу о ваших результатах в комментариях)