Нам нужно приостановить поток процесса, пока клиент не подтвердит, что он в порядке с данными, затем продолжить поток.
Как мы это делаем:
Подготовьте электронное письмо с прикрепленным pdf, отправьте клиенту для проверки, если он согласится, что pdf правильно выполнен, тогда клиент возобновит выполнение.
Мы выполняем его, останавливая поток и пытаясь перезапустить его, используя две параллельные задачи: «send_validation_email_pdf» и «user_validation» соответственно. Мы поставили задачу «user_validation» провалиться «по назначению». В то же время задача «send_validation_email_pdf» отправляет документы в формате pdf со ссылкой, которая позволяет клиенту установить для задачи «user_validation» статус «помечать как успешный».
Пример гиперссылки:
http://localhost:8080/admin/airflow/success?task_id=user_validation&dag_id=rf.duree&upstream=false&downstream=false&future=false&past=false&execution_date=2019-05-24T00%3A00%3A00%2B00%3A00&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Ftree%3Fdag_id%3Drf.duree&confirmed=true
Мы бы хотели, чтобы эта задача была выполнена успешно и возобновила весь поток. Однако одного лишь обозначения этой задачи как «успеха» недостаточно. Причина заключается в том, что следующая задача остается в том же состоянии = «upstream_failed» и не будет перезапущена.
Я попытался включить в следующую задачу под названием «fin_send_email_validation» следующий параметр: trigger_rule = TriggerRule.ALL_SUCCESS - но это не сработало.
Также попытался использовать параметр в «fin_send_email_validation» следующий параметр «depen_on_past = True». Также не работает.
Будет ли у кого-то более четкое представление о том, как приостановить поток процесса, пока клиент не подтвердит, что он в порядке с данными, а затем продолжить поток?
Или кто-то может дать мне представление о том, как разблокировать то, что я уже пытаюсь сделать.
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
send_validation_email_pdf = PythonOperator(
task_id="send_validation_email_pdf",
provide_context=True,
python_callable=set_send_validation_email_pdf,
dag=dag,
)
user_validation = PythonOperator(
task_id="user_validation",
retries=0,
email_on_failure=False,
python_callable=user_validation,
dag=dag,
)
fin_send_email_validation = DummyOperator(task_id="fin_send_email_validation", trigger_rule=TriggerRule.ALL_SUCCESS,
depends_on_past=True, dag=dag)
fin_refresh_TDE >> send_validation_email_pdf >> fin_send_email_validation
fin_refresh_TDE >> user_validation >> fin_send_email_validation```
Pause process flow, until the client confirmed he is OK with the data, then continue the flow.