Я работаю над некоторым тестом воздушного потока, чтобы отправить состояние задачи в воздушный поток. Если задача не выполнена, я хочу отправить некоторую подробную информацию в свободный канал. В этом случае я использовал on_failure_callback
, пытаясь перехватить возникшее исключение и отправить журнал ошибок в slack. Сейчас моя проблема в том, что когда я использовал context.get('exception')
, ничего не получалось.
Ниже приведен пример, который я использовал для теста, я ожидаю получить сообщение об ошибке ZeroDivisionError: integer division or modulo by zero
.
def slack_failed_task(context):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=slack_channel,
token=slack_token,
text="""Dag: {dag}
*Task*: {task}
*reason*: {reason},
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
## if change to str(context.get('exception')) then I got "None"
reason = context.get('exception')
)
)
return failed_alert.execute(context=context)
def test_py_function(**kwargs):
print(2 / 0)
def sub_dag(parent_dag, child_dag, start_date, schedule_interval):
dag = DAG("{}.{}".format(parent_dag, child_dag),
schedule_interval=schedule_interval,
start_date=start_date,)
t2 = PythonOperator(task_id='sub_task2',
python_callable=test_py_function,
dag=dag,
on_failure_callback=slack_failed_task,
)
return dag
Я также пытался использовать context.get('task_instance').get('error')
и context.get('task_instance').get('log')
, но все еще в состоянии покоя сообщение не получено. Очень признателен, если вы можете предоставить некоторые идеи или поделиться примером для решения этой проблемы.