Передача конф в TriggerDagRunOperator - PullRequest
0 голосов
/ 04 ноября 2019

С потоком воздуха 1.10, когда я передаю conf TriggerDagRunOperator, например

dag = DAG(
    dag_id='all_dist_cp',
    default_args=args,
    dagrun_timeout=timedelta(minutes=60),
)

distcp_1 = TriggerDagRunOperator(
    task_id="distcp_1",
    trigger_dag_id="dist_cp",
    conf={
        "SERVICE_ID": "A",
        "SOURCE":"...",
        "DESTINATION":"..."
    },
    dag=dag
)

distcp_2 = TriggerDagRunOperator(
    task_id="distcp_2",
    trigger_dag_id="dist_cp",
    conf={
        "SERVICE_ID": "B",
        "SOURCE":"...",
        "DESTINATION":"..."
    },
    dag=dag
)

Он жалуется, что не может установить conf для TriggerDagRunOperator. Так почему же TriggerDagRunOperator имеет аргумент conf?

dagrun_operator.py:65: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator (task_id: distcp_1). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2019-11-04 17:07:59,083] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1 *args: ()
[2019-11-04 17:07:59,084] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1 **kwargs: {'conf': {'SERVICE_ID': 'A', 'SOURCE': '', 'DESTINATION': ''}}
[2019-11-04 17:07:59,084] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1   super(TriggerDagRunOperator, self).__init__(*args, **kwargs)

Как передать параметры в distcp dag? У меня какой-то дикт должен быть переведен в distcp DAG

distcp dag выглядит как

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='distcp',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
)

dist_cp = BashOperator(
    task_id='dist_cp',
    bash_command="""
SERVICE_ID={{dag_run.conf['SERVICE_ID']}}
hadoop distcp \
-Dmapreduce.job.queuename=small \
....
    """,
    dag=dag,
)

dist_cp
...