Вроде нужно какое-то ветвление. В приведенном ниже примере я использовал BranchPythonOperator
для выполнения функции, которая пытается создать новую подписку и вернуть строку, информирующую, была ли задача выполнена успешно или неудачно. После этого я использовал два PythonOperators
с task_id, соответствующими строкам, возвращаемым BranchPythonOperator, и определил зависимость между start_op и success и fail tasks
from airflow.contrib.hooks import gcp_pubsub_hook
from airflow.operators import python_operator
from airflow import models
def print_context1(ds, **kwargs):
return 'THE TASK SUCCEDED'
def print_context2(ds, **kwargs):
return 'THE TASK FAILED'
def start_function(ds, **kwargs):
try:
response = gcp_pubsub_hook.PubSubHook().create_subscription(
topic_project="my-project",
topic="beam",
subscription="beam_sub2",
subscription_project=None,
ack_deadline_secs=10,
fail_if_exists=True)
return "succeeded"
except gcp_pubsub_hook.PubSubException:
return "failed"
default_dag_args = {
...
}
with models.DAG(
'pubsub_airflow',
default_args=default_dag_args) as dag:
start_op = python_operator.BranchPythonOperator(
task_id='start',
provide_context=True,
python_callable=start_function
)
success = python_operator.PythonOperator(
task_id='succeeded',
provide_context=True,
python_callable=print_context1
)
fail = python_operator.PythonOperator(
task_id='failed',
provide_context=True,
python_callable=print_context2
)
start_op >> [success,fail]
В вашем случае вы можете использовать этот код в качестве основы и заменить два PythonOperators
на ваш DummyOperator и ваш датчик.