Конфигурация времени выполнения для PythonOperator - PullRequest
0 голосов
/ 14 декабря 2018

Я не могу найти рабочую документацию о том, как использовать переменные JSON, переданные с "-c", например, для задания обратной засыпки.

Я печатал свои задачи на Python ** kwargs, чтобы выяснить это, но я все еще не могу определить это.provide_context = True

Может ли кто-нибудь указать мне правильное направление?

Итак, что я хочу сделать:

airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12

У меня есть PythonOperator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    op_kwargs = {
        'task_conf': task_config 
    },
    provide_context=True,
    dag = this_dag
)

В run_task я хотел бы получить доступ к переменной переопределения:

def run_task(*args, **kwargs): 

    dag_run = kwargs.get('dag_run')
    logging.info(kwargs['dag_run'].conf.get('override'))

Но я не могу найти метод для доступа к этой переменной переопределения

[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
    logging.info(kwargs['dag_run'].conf.get('override'))

Редактировать:Я нашел настройки и описание конфигурации, которые, кажется, указывают, что эти параметры уже должны быть установлены в коде

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True

Параметр donot_pickle был установлен в False.

Ответы [ 2 ]

0 голосов
/ 15 декабря 2018

Вы можете передать параметры из CLI, используя -c '{"key":"value"}', а затем использовать его в вызываемой функции Python как "dag_run.conf["key"]"

В вашем случае,

Оператор :

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    provide_context=True,
    dag = this_dag
)

Функция вызова :

def run_task(**kwargs):
    print(kwargs["dag_run"].conf["override"]
0 голосов
/ 14 декабря 2018

Вам нужно сделать две вещи.

  1. Заменить kwargs['run_dag'].conf.get('override') на kwargs['dag_run'].conf.get('override').
  2. Также измените подпись с def run_task(*args, **kwargs): на def run_task(**kwargs):
...