Как перехватить переданный параметр --conf в вызываемой группе DAG в Airflow - PullRequest
0 голосов
/ 16 ноября 2018

Я пытаюсь запустить DAG из REST API и передать ему некоторые параметры.Группа обеспечения доступности баз данных должна быть в состоянии отловить параметры и использовать их.Проблема в том, что я могу вызвать DAG из REST API, но DAG не может перехватить переданные параметры.Есть ли способ добиться этого?

Я запускаю DAG из REST API, как показано ниже. Он передает параметры в --conf

http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2

Как захватить значения, переданные в confзначение в вызываемой DAG.Насколько я знаю, conf должен принимать URL-данные в формате JSON.

Код DAG: `

def run_this_func(**kwargs):
print(kwargs)

run_this = PythonOperator(
    task_id='run_this',
    python_callable=run_this_func,
    dag=dag
)`

Ответы [ 2 ]

0 голосов
/ 19 ноября 2018

Я не знал, что вы можете вызвать DAG с HTTP GET, но я успешно запустил conf с использованием POST и следуя документации https://airflow.apache.org/api.html

Например, запуск метки "trigger_test_dag":

curl -X POST --data '"conf":"{\"key\":\"value\"}"' \
"http://abcairflow.com:8090/api/experimental/dags/trigger_test_dag/dag_runs"

Обратите внимание на экранирование апострофов, так как conf должен быть строкой. Я полагаю, что вы можете выполнить кодирование base 64, а затем декодировать его в DAG, если хотите, если хотите.

0 голосов
/ 16 ноября 2018

К сожалению, это не хорошо документированная функция, но есть примеры DAG, запускающей другую DAG с набором conf, и целевой DAG, использующей его.См. example_trigger_controller_dag и example_trigger_target_dag .Группы обеспечения доступности баз данных, запускаемые оператором, API-интерфейсом REST или интерфейсом командной строки, должны передавать параметр conf одинаково.

conf доступен внутри контекста, поэтому вам необходимо убедиться, что вы передали provide_context=True при использовании PythonOperator.

def run_this_func(**kwargs):
    print(kwargs['conf'])

run_this = PythonOperator(
    task_id='run_this',
    python_callable=run_this_func,
    dag=dag,
    provide_context=True,
)
...