Установить параметры dag_run.conf при вызове теста воздушного потока - PullRequest
0 голосов
/ 02 мая 2018

Кто-нибудь знает, есть ли способ установить параметры dag_run.conf при запуске airflow test в приглашении bash?

Например, я скачал example_trigger_target_dag из официального репозитория воздушного потока, и я хотел бы протестировать задачу run_this. Обычно я делал бы следующее:

~/$ airflow test example_trigger_target_dag run_this '2018-01-01'

Однако при выполнении этого выдает ошибку:

--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2018-05-02 10:50:01,154] {models.py:1342} INFO - Executing <Task(PythonOperator): run_this> on 2018-01-01 00:00:00
[2018-05-02 10:50:01,262] {models.py:1417} ERROR - 'NoneType' object has no attribute 'conf'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 80, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/annalect/uk_ds_airflow/dags/playpen/example_trigger_target_dag.py", line 56, in run_this_func
    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
AttributeError: 'NoneType' object has no attribute 'conf'

Я попытался использовать аргумент task_params, но у меня либо неверный синтаксис, либо он не достиг того, что я получаю, поскольку выдает ту же ошибку, что и выше:

~/$ airflow test --task_params '{"kwargs": {"dag_run": {"conf": {"message": "Hey world"}}}}' example_trigger_target_dag run_this '2018-01-01'

[2018-05-02 11:10:58,065] {models.py:1441} INFO - Marking task as FAILED.
[2018-05-02 11:10:58,070] {models.py:1462} ERROR - 'NoneType' object has no attribute 'conf'

Так кто-нибудь знает, как проверить задачу, которая зависит от dag_run.conf значений?

Спасибо!

1 Ответ

0 голосов
/ 16 мая 2018

Для команды airflow test нет опции --conf, но вы можете обойти эту проблему, передав параметры в python_callable задачи.

В вызываемой, если установлено kwargs['test_mode'], вы можете получить параметры для создания фиктивного DagRun объекта, например, так:

from airflow.models import DagRun
...

def run_this_func(ds, **kwargs):
    if kwargs['test_mode']:
        kwargs['dag_run'] = DagRun(conf=kwargs['params'])

    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))

Чтобы проверить example_trigger_target_dag, просто выполните:

airflow test example_trigger_target_dag test_trigger_dagrun "2018-01-01" -tp '{"message":"Hello world"}'

и вы получите:

Remotely received value of Hello world for key=message

Теперь вместо того, чтобы помещать тестовый код в свои задачи, вы можете написать декоратор. Кроме того, поскольку мы просто используем conf атрибут DagRun, мы также можем использовать SimpleNamespace. И, наконец, чтобы избежать потенциальной ошибки ключа при поиске kwargs, мы можем использовать get со значением по умолчанию.

from types import SimpleNamespace

def allow_conf_testing(func):
    def wrapper(*args, **kwargs):
        if kwargs.get('test_mode', False):
            kwargs['dag_run'] = SimpleNamespace(conf=kwargs.get('params', {}))
        func(*args, **kwargs)
    return wrapper

@allow_conf_testing
def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
...