Как обрабатывать условные точки принятия решения в Apache Airflow? - PullRequest
1 голос
/ 05 мая 2020

Я в процессе преобразования рабочего процесса, который у меня есть в настоящее время, в Apache задачи воздушного потока. Рабочий процесс выглядит следующим образом:

1. Run set of tests.  
2. Check results
  - If the tests pass then move to the next set of tests.  
  - If the tests fail, log the failure information (time, reason, what test was ran) in a database.    
3. Send message out about failures. 

Прямо сейчас у меня выполняется задача PythonOperator, которая запускает вызываемый объект, который выполняет набор тестов на основе информации, хранящейся в MongoDB.

test_info_conn = MongoHook(conn_id='test_selector_mongo')
test_list = test_info_conn.find('test_selector_metadata', None).limit(2)

for count, current_test in enumerate(test_list):
    status_test_task = PythonOperator(
                     task_id='status_test_'+str(count),
                     python_callable=run_status_tests,
                     op_kwargs={'current_test':current_test},
                     provide_context=True,
                     dag=dag)

Это тот момент, когда я не уверен, что делать дальше. Могу ли я связать другую задачу, у которой есть вызываемый объект для обработки результатов, и как мне получить результаты для этой следующей задачи в цепочке?

1 Ответ

0 голосов
/ 05 мая 2020
  • Для разветвления рабочего процесса (2-й шаг) вы можете использовать BranchPythonOperator
  • Для обмена информацией (журнала) между задачами (2–3 шаг) Airflow имеет XCOM s (xcom_pull() функция )

Ссылки

...