Возможно передать произвольные значения conf при запуске DAG, однако, я озадачен сделать это на уровне кода, поэтому у меня может быть надлежащая проверка работающего модуля
airflow trigger_dag 'example_dag_conf' -r 'run_id' --conf '{"message":"value"}'
import datetime
from datetime import datetime
import pytest
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import PythonOperator
pytest_plugins = ['helpers_namespace']
@pytest.fixture
def test_dag():
"""Our DAG fixture"""
return DAG(
dag_id='lrw-sklearn-dag-pytest',
start_date=datetime(2019, 1, 1),
schedule_interval=None,
default_args={
'owner': 'airflow',
'start_date': datetime(2019, 1, 1)
},
params={
'param1': 'value1',
'param2': 'value2'
}
)
@pytest.helpers.register
def run_task(task, dag):
"""Helper to run any task"""
dag.clear()
ti = TaskInstance(task=task, execution_date=datetime.now())
task.execute(ti.get_template_context())
def test_dag_run_outside_airflow(test_dag, tmpdir):
"""Test execution for the DAG"""
from .tasks import train
op_train = PythonOperator(
task_id='train',
provide_context=True,
python_callable=train,
dag=test_dag,
)
pytest.helpers.run_task(task=op_train, dag=test_dag)
tasks.py / train
def train(ds, **kwargs):
"""Train Task."""
params = kwargs.get('dag_run').conf
# prepare specified params for loader
load_params = dict(
bucket=config.AWS_S3_BUCKET_NAME,
project_id=params.get('project_id'),
iv_filename=params.get('iv_filename'),
dv_filename=params.get('dv_filename'),
cat_vars_filename=params.get('cat_vars_filename'),
scale_vars_filename=params.get('scale_vars_filename'),
iv_idvar=params.get('iv_idvar'),
dv_idvar=params.get('dv_idvar'),
global_filter_variable=params.get('global_filter_variable'),
)
with load_data(**load_params) as data_dict:
xgbt = XGBoostTraining(
data=data_dict,
scorers=params.get('scorers'),
iterations=params.get('iterations'),
repeats=params.get('repeats'),
)
trained_models = xgbt.run()
# send email notification with report of results
# see actual templates used in /templates folder
# it sends both the text and html version of the email
send_trained_models_by_email(
trained_models, email=params.get('email'))
return 'training finished'
op_train = PythonOperator(
task_id='train',
provide_context=True,
python_callable=train,
dag=dag,
)
Итак, как эмулировать conf, чтобы params = kwargs.get('dag_run').conf
in train () получил значения, которые я установил из устройства
тест?