поймать исключение AirflowSkipException в Airflow для вызова S3KeySensor - PullRequest
0 голосов
/ 27 марта 2019

Вызов S3KeySensor помечает себя и последующие задачи как пропущенные, когда для soft_fail установлено значение True.Однако мне нужно отловить это исключение , т.е. AirflowSkipException и выполнить некоторые действия на его основе.Моя проблема заключается в использовании блока try и catch, который не перехватывает никаких исключений, поэтому я не могу выполнять последующие задачи, которые зависят от него.

Использовал приведенную ниже документацию по коду воздушного потока, чтобы выяснить тип исключения: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/base_sensor_operator.html

Ниже приведено следующее исключение:

try:
    s3_file_watcher_task_operator = S3KeySensor(
        task_id='s3_file_watcher_task',
        bucket_key="{{ task_instance.xcom_pull(key='s3_poll_config', task_ids='dag_start_config_setup_task')['source_extract_regex'] }}",
        poke_interval=float(Variable.get("S3_POKE_INTERVAL")),  # Checking interval in seconds
        timeout=float(Variable.get("S3_TIMEOUT")),   # Total seconds for Timeout
        soft_fail=True,   #Ensure task status becomes 'skipped' in case of failure. Subsequent task will handle soft failures.
        wildcard_match=True,
        bucket_name="{{ task_instance.xcom_pull(key='s3_poll_config', task_ids='dag_start_config_setup_task')['landing_bucket'] }}",
        aws_conn_id="{{ task_instance.xcom_pull(key='s3_poll_config', task_ids='dag_start_config_setup_task')['aws_conn_id'] }}",
        dag=dag)
except AirflowSkipException as e:
    closetaskinstance(job_exec_config, task_execution_id[0], 'FAILED', str(datetime.now()))
    closedaginstance(job_exec_config, dag_execution_id, 'FAILED', str(datetime.now()))
    raise
...