После срабатывания dag воздушного потока перехватите дату выполнения и продолжайте проверять состояние dag_status каждые несколько секунд и возвращайте, если статус «успешен» или «не выполнен».
вот сценарий, который я написал:
try_log.py
import re
import os
import sys
import time
f = open('log.txt', 'r')
outs = f.readlines()
regex = r"Created.*?@(.*?)\s(.*?)manual"
matches = re.findall(regex, str(outs), re.MULTILINE)
dag_exec_date = matches[0][1].strip()[0:-1]
status = 'running'
while status not in ['failed', 'success']:
stream = os.popen("airflow dag_state dag_id '{}'".format(dag_exec_date))
output = stream.read()
lines = output.split('\n')
status = lines[-2].strip()
if status not in ['failed', 'success']:
time.sleep(60)
else:
print (status)
exit()
в Bamboo S SH добавлено следующее ниже для выполнения триггера воздушного потока и ожидания завершения состояния
airflow trigger_dag ${bamboo.dag_id} > /path/log.txt
outputs=$(python try_log.py)
if [ "$outputs" = "failed" ]; then
echo "status failed. Exiting and failing build"
exit 125
fi