В Apache Airflow есть способ зафиксировать ошибку команды bash? - PullRequest
0 голосов
/ 12 октября 2018

В Apache Airflow, возможно ли перехватить исходное сообщение об ошибке, которое вызвала сбойная команда bash, вместо ошибки трассировки, созданной Apache Airflow, которая сообщает вам, что произошла ошибка линии, но не совсем, почему она произошла?

Пример строки в Dag:

gsutil_rsync = BashOperator(
        task_id="task1",
        bash_command='gsutil rsync -r s3://bucket/ gs://bucket',
        dag=dag)

1 Ответ

0 голосов
/ 16 октября 2018

Я написал это решение с помощью функции python и PythonOperator и установил xcom_push=True в PythonOperator.

import subprocess
from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 15),
    'email': 'me@airflow.com',
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': 1,
}


def run_bash():
    result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
    return result.stdout

run_bash()

with DAG('bash_dag', schedule_interval="@daily", default_args=default_args) as dag:
    start_brach = DummyOperator(task_id='start')

    gsutil_rsync_py = PythonOperator(
        task_id="task1",
        python_callable=run_bash,
        xcom_push=True,
        dag=dag)


    start_brach.set_downstream(gsutil_rsync_py)

И приведу это;

enter image description here

...