Использование аргументов оператора воздушного потока внутри функции - PullRequest
0 голосов
/ 26 февраля 2020

У меня есть две задачи в моем dag, где второй dag вызывает пользовательский оператор Sqltquery. Оператор Sqltquery получает два аргумента, который имеет значение Dynami c, и извлекает результат запроса. Использование on_failure_callback для вызова функции с именем fail_log. Если есть какой-либо сбой, внутри функции fail_log, мне нужно захватить значение "sql" & "sqlite_conn_id". Я не использовал параметр шаблонов. Как это можно сделать? Пожалуйста, помогите.

Образец Dag:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import Sqltquery
from datetime import datetime,timedelta
import sys

def fail_log(context):
    ###Need to capture value of "sql" & "sqlite_conn_id" which was passed as arguments to Sqltquery operator.And do some activity based on values.

args = {'owner': 'airflow','depends_on_past': False,'on_failure_callback': fail_log,'start_date': datetime(2020,2,12)}

dag = DAG(dag_id='test_params',default_args=args,catchup=False,schedule_interval=None)

t1 = DummyOperator(task_id='test_task',dag=dag)
t2 = Sqltquery(task_id='test_dt_ft', sql="select distinct columnA from Table;", sqlite_conn_id="Dbconnection", dag=dag, provide_context=True)
t1 >> t2
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...