Передайте значения conf в тесте DAG / модульного теста воздушного потока программно - PullRequest
0 голосов
/ 17 марта 2020

Возможно передать произвольные значения conf при запуске DAG, однако, я озадачен сделать это на уровне кода, поэтому у меня может быть надлежащая проверка работающего модуля

airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
import datetime
from datetime import datetime

import pytest
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import PythonOperator

pytest_plugins = ['helpers_namespace']

@pytest.fixture
def test_dag():
    """Our DAG fixture"""
    return DAG(
        dag_id='lrw-sklearn-dag-pytest',
        start_date=datetime(2019, 1, 1),
        schedule_interval=None,
        default_args={
            'owner': 'airflow',
            'start_date': datetime(2019, 1, 1)
        },
        params={
            'param1': 'value1',
            'param2': 'value2'
        }
    )

@pytest.helpers.register
def run_task(task, dag):
    """Helper to run any task"""
    dag.clear()
    ti = TaskInstance(task=task, execution_date=datetime.now())
    task.execute(ti.get_template_context())


def test_dag_run_outside_airflow(test_dag, tmpdir):
    """Test execution for the DAG"""
    from .tasks import train

    op_train = PythonOperator(
        task_id='train',
        provide_context=True,
        python_callable=train,
        dag=test_dag,
    )

    pytest.helpers.run_task(task=op_train, dag=test_dag)

tasks.py / train

def train(ds, **kwargs):
    """Train Task."""

    params = kwargs.get('dag_run').conf

    # prepare specified params for loader
    load_params = dict(
        bucket=config.AWS_S3_BUCKET_NAME,
        project_id=params.get('project_id'),
        iv_filename=params.get('iv_filename'),
        dv_filename=params.get('dv_filename'),
        cat_vars_filename=params.get('cat_vars_filename'),
        scale_vars_filename=params.get('scale_vars_filename'),
        iv_idvar=params.get('iv_idvar'),
        dv_idvar=params.get('dv_idvar'),
        global_filter_variable=params.get('global_filter_variable'),
    )

    with load_data(**load_params) as data_dict:
        xgbt = XGBoostTraining(
            data=data_dict,
            scorers=params.get('scorers'),
            iterations=params.get('iterations'),
            repeats=params.get('repeats'),
        )

        trained_models = xgbt.run()

        # send email notification with report of results
        # see actual templates used in /templates folder
        # it sends both the text and html version of the email
        send_trained_models_by_email(
            trained_models, email=params.get('email'))

    return 'training finished'


op_train = PythonOperator(
    task_id='train',
    provide_context=True,
    python_callable=train,
    dag=dag,
)

Итак, как эмулировать conf, чтобы params = kwargs.get('dag_run').conf in train () получил значения, которые я установил из устройства

тест?

...