Конвертировать скрипт Python в Airflow PythonOperator (s) - PullRequest
0 голосов
/ 10 июня 2018

У меня есть рабочий скрипт на Python с запусками из CronJob.Я хочу преобразовать его в DAG с PythonOperator(s), как мы сейчас конвертируем в Airflow.

Скажите, что у меня есть функции: a(),b(),c(),d() И порядок их выполнения: a->b->c->d

Позволяетскажем, что коды функций:

def a(): 
    print("Happy")

def b(): 
    print("Birthday")

def c(): 
    print("to")

def d(): 
    print("you!")

** Это всего лишь пример, мой код для всех функций более сложный

У меня есть этот DAG:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

dag = DAG(dag_id='example', default_args=args)

a = PythonOperator(task_id='a', dag=dag)
b = PythonOperator(task_id='b', dag=dag)
c = PythonOperator(task_id='c', dag=dag)
d = PythonOperator(task_id='d', dag=dag)

a.set_downstream(b)
b.set_downstream(c)
c.set_downstream(d)

Чего я не понимаю, так это где я могу разместить коды a(),b(),c(),d() и где я могу указать их имена при выполнении PythonOperator.

Можно сказать, что я ищу способпреобразовать мой скрипт Python в Airflow, так как каждая функция будет отдельным оператором.

Я думал, что это должно быть очень просто и просто, но я не нашел никакой информации о том, как это сделать.

1 Ответ

0 голосов
/ 10 июня 2018

В операторе python функция python, которая должна быть выполнена, передается в оператор.Таким образом, вы захотите передать python_callable kwarg следующим образом:

def do_a():
    print('running a')

a = PythonOperator(task_id='a', python_callable=do_a, dag=dag)

Источник для операторов обычно документирует параметры для них. Python оператор документы

...