Ответ @ andscoop хорош, но только для того, чтобы принести больше идей:
Возможное решение 1
Я делаю нечто подобное (зависимости A> B> C), и я решил подход, используя XCOM, выдвинутый по умолчанию предыдущей задачей.
Любое значение, которое возвращает метод execute, сохраняется как сообщение Xcom.
под ключ return_value
. Мы рассмотрим эту тему позже.
Источник http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
# copy&paste it into dags/stackoverflow.py to test it
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG('stackoverflow', description='Another Dag',
schedule_interval='* * * 1 1',
start_date=datetime(2018, 6, 27), catchup=False)
def do_a(**kwargs):
# Assuming that your TASK A is not returning a value
return None
task_a = PythonOperator(task_id='do_a',
python_callable=do_a,
provide_context=True,
dag=dag)
def do_b(**kwargs):
result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
if result_from_a:
print("Continue with your second task")
else:
print("Send a notification somewhere, log something or stop the job here.")
task_b = PythonOperator(task_id='do_b',
python_callable=do_b,
provide_context=True,
dag=dag)
task_a >> task_b
Возможное решение 2
ветвление. Более изощренным способом (и с использованием лучших практик) вы можете выполнить ветку, чтобы определить следующий шаг / задачу на основе результата t1
. Я не могу сделать правильный пример сейчас, но вот 2 источника, чтобы понять, как это работает с примерами: