Для Apache Airflow Как я могу передать параметры при ручном запуске DAG через CLI? - PullRequest
0 голосов
/ 07 декабря 2018

Я использую Airflow для управления выполнением и расписанием задач ETL.DAG был создан, и он отлично работает.Но возможно ли передать параметры при ручном запуске dag через cli.

Например: моя группа доступности баз данных работает каждый день в 01:30 и обрабатывает данные за вчерашний день (диапазон времени с 01:30 вчера по 01:30 сегодня).Там могут быть некоторые проблемы с источником данных.Мне нужно повторно обработать эти данные (вручную указать диапазон времени).

Так что я могу создать такой DAG с воздушным потоком, когда это запланировано, что диапазон времени по умолчанию - от 01:30 вчера до 01:30сегодня.Затем, если что-то не так с источником данных, мне нужно вручную вызвать группу обеспечения доступности баз данных и вручную передать временной диапазон в качестве параметров.

Как я знаю, airflow test имеет -tp, который может передавать параметры в задачу.Но это только для тестирования конкретной задачи.и airflow trigger_dag не имеет опции -tp.Так есть ли способ tigger_dag и передать параметры в DAG, и тогда Оператор может прочитать эти параметры?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 08 декабря 2018

Вы можете передать параметры из CLI, используя --conf '{"key":"value"}', а затем использовать его в файле DAG как "{{ dag_run.conf["key"] }}" в шаблонном поле.

CLI :

airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'

Файл DAG :

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_dag_conf',
    default_args=args,
    schedule_interval=None,
)

def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag,
)
0 голосов
/ 07 декабря 2018

Это должно работать согласно документации по воздушному потоку: https://airflow.apache.org/cli.html#trigger_dag

airflow trigger_dag -c '{"key1":1, "key2":2}' dag_id

Убедитесь, что значение -c является допустимой строкой json, поэтому перенос двойных кавычекключи необходимы здесь.

...