установите отметку как успешную и принудительно запустите все последующие задачи - PullRequest
0 голосов
/ 06 июня 2019

Нам нужно приостановить поток процесса, пока клиент не подтвердит, что он в порядке с данными, затем продолжить поток.

Как мы это делаем: Подготовьте электронное письмо с прикрепленным pdf, отправьте клиенту для проверки, если он согласится, что pdf правильно выполнен, тогда клиент возобновит выполнение.

Мы выполняем его, останавливая поток и пытаясь перезапустить его, используя две параллельные задачи: «send_validation_email_pdf» и «user_validation» соответственно. Мы поставили задачу «user_validation» провалиться «по назначению». В то же время задача «send_validation_email_pdf» отправляет документы в формате pdf со ссылкой, которая позволяет клиенту установить для задачи «user_validation» статус «помечать как успешный».

Пример гиперссылки:

http://localhost:8080/admin/airflow/success?task_id=user_validation&dag_id=rf.duree&upstream=false&downstream=false&future=false&past=false&execution_date=2019-05-24T00%3A00%3A00%2B00%3A00&origin=http%3A%2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Ftree%3Fdag_id%3Drf.duree&confirmed=true

Мы бы хотели, чтобы эта задача была выполнена успешно и возобновила весь поток. Однако одного лишь обозначения этой задачи как «успеха» недостаточно. Причина заключается в том, что следующая задача остается в том же состоянии = «upstream_failed» и не будет перезапущена.

Я попытался включить в следующую задачу под названием «fin_send_email_validation» следующий параметр: trigger_rule = TriggerRule.ALL_SUCCESS - но это не сработало. Также попытался использовать параметр в «fin_send_email_validation» следующий параметр «depen_on_past = True». Также не работает.

Будет ли у кого-то более четкое представление о том, как приостановить поток процесса, пока клиент не подтвердит, что он в порядке с данными, а затем продолжить поток? Или кто-то может дать мне представление о том, как разблокировать то, что я уже пытаюсь сделать.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

send_validation_email_pdf = PythonOperator(
    task_id="send_validation_email_pdf",
    provide_context=True,
    python_callable=set_send_validation_email_pdf,
    dag=dag,
)

user_validation = PythonOperator(
    task_id="user_validation",
    retries=0,
    email_on_failure=False,
    python_callable=user_validation,
    dag=dag,
)

fin_send_email_validation = DummyOperator(task_id="fin_send_email_validation", trigger_rule=TriggerRule.ALL_SUCCESS,
                                          depends_on_past=True, dag=dag)

fin_refresh_TDE >> send_validation_email_pdf >> fin_send_email_validation
fin_refresh_TDE >> user_validation >> fin_send_email_validation```


Pause process flow, until the client confirmed he is OK with the data, then continue the flow.

1 Ответ

0 голосов
/ 06 июня 2019

Задача, которая просто спит, с соответствующей задержкой soft_fail. Если все выполнено, просто отметьте его как успешный, и остальная часть рабочего процесса должна продолжиться.

def user_validation():
    time.sleep(86350)


user_validation = PythonOperator(
        task_id="user_validation",
        retries=0,
        email_on_failure=False,
        soft_fail=True,
        timeout=86400,
        python_callable=user_validation,
        dag=dag,
    )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...