С потоком воздуха 1.10, когда я передаю conf TriggerDagRunOperator, например
dag = DAG(
dag_id='all_dist_cp',
default_args=args,
dagrun_timeout=timedelta(minutes=60),
)
distcp_1 = TriggerDagRunOperator(
task_id="distcp_1",
trigger_dag_id="dist_cp",
conf={
"SERVICE_ID": "A",
"SOURCE":"...",
"DESTINATION":"..."
},
dag=dag
)
distcp_2 = TriggerDagRunOperator(
task_id="distcp_2",
trigger_dag_id="dist_cp",
conf={
"SERVICE_ID": "B",
"SOURCE":"...",
"DESTINATION":"..."
},
dag=dag
)
Он жалуется, что не может установить conf для TriggerDagRunOperator. Так почему же TriggerDagRunOperator имеет аргумент conf?
dagrun_operator.py:65: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator (task_id: distcp_1). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2019-11-04 17:07:59,083] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1 *args: ()
[2019-11-04 17:07:59,084] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1 **kwargs: {'conf': {'SERVICE_ID': 'A', 'SOURCE': '', 'DESTINATION': ''}}
[2019-11-04 17:07:59,084] {base_task_runner.py:115} INFO - Job 70: Subtask distcp_1 super(TriggerDagRunOperator, self).__init__(*args, **kwargs)
Как передать параметры в distcp dag? У меня какой-то дикт должен быть переведен в distcp DAG
distcp dag выглядит как
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='distcp',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
dist_cp = BashOperator(
task_id='dist_cp',
bash_command="""
SERVICE_ID={{dag_run.conf['SERVICE_ID']}}
hadoop distcp \
-Dmapreduce.job.queuename=small \
....
""",
dag=dag,
)
dist_cp