Создание конвейера для 5 скриптов Python в зависимости от предыдущих задач - PullRequest
0 голосов
/ 15 января 2019

Я хочу создать планировщик конвейера с использованием воздушного потока, который будет выполнять 5 сценариев Python, хранящихся в домашнем каталоге. Сценарии Python: test1.py, test2.py, test3.py test4.py и final.py. Как я должен загружать скрипты в поток воздуха, Кто-нибудь может мне помочь с фрагментом кода. Я новичок в воздушном потоке, я пробовал учебники, но я не могу понять, используя планировщик.

Пожалуйста, не дублируйте этот вопрос, мне действительно нужно понять.

1 Ответ

0 голосов
/ 15 января 2019

Данные файлы test1.py, test2.py, test3.py как

# this is `test1.py`
def entry_point_1():
    print("entry_point_1")

вы можете создать test_dag.py

.
├── __init__.py
├── test1.py
├── test2.py
├── test3.py
└── test_dag.py

Есть 2 простых подхода

1. Использование PythonOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import test1, test2, test3

dag_python: DAG = DAG(dag_id="dag_using_python_op",
                      start_date=datetime(year=2019, month=1, day=14),
                      schedule_interval=None)
python_op_1: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_1",
                                             python_callable=test1.entry_point_1)
python_op_2: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_2",
                                             python_callable=test2.entry_point_2)
python_op_3: PythonOperator = PythonOperator(dag=dag_python,
                                             task_id="python_op_3",
                                             python_callable=test3.entry_point_3)
python_op_1 >> python_op_2 >> python_op_3

2. Использование BashOperator

# this is `test_dag.py`
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

dag_bash: DAG = DAG(dag_id="dag_using_bash_op",
                    start_date=datetime(year=2019, month=1, day=14),
                    schedule_interval=None)
bash_op_1: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_1",
                                       bash_command="python -c 'import test1; test1.entry_point_1()")
bash_op_2: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_2",
                                       bash_command="python -c 'import test2; test2.entry_point_2()'")
bash_op_3: BashOperator = BashOperator(dag=dag_bash,
                                       task_id="bash_op_3",
                                       bash_command="python -c 'import test3; test3.entry_point_3()'")
bash_op_1 >> bash_op_2 >> bash_op_3

Примечание: вам нужно исправить PYTHONPATH, чтобы оно заработало; Я не смог этого сделать, но попробую (и сообщу о ваших результатах в комментариях)


...