Я не знаю, есть ли более простой способ, но вы можете создать пользовательский оператор, который черпает вдохновение из TriggerDagRunOperator (https://github.com/apache/airflow/blob/master/airflow/operators/dagrun_operator.py) и использует переданный Callable для получения функции.
Что-то, что я взломал вместе очень быстро (можно определенно улучшить):
from airflow.models import DAG
from airflow.utils.dates import days_ago, timedelta
from airflow.operators.dagrun_operator import TriggerDagRunOperator
import random
import datetime
from typing import Dict, Optional, Union, Callable
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
class TriggerDagRunWithFuncOperator(BaseOperator):
"""
Triggers a DAG run for a specified ``dag_id``
:param trigger_dag_id_f: the dag_id function to trigger
:type trigger_dag_id_f: Callable
:param conf: Configuration for the DAG run
:type conf: dict
:param execution_date: Execution date for the dag (templated)
:type execution_date: str or datetime.datetime
"""
template_fields = ("execution_date", "conf")
ui_color = "#ffefeb"
@apply_defaults
def __init__(
self,
get_dag_name_f: Callable,
conf: Optional[Dict] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.conf = conf
self.get_dag_name_f = get_dag_name_f
if not isinstance(execution_date, (str, datetime.datetime, type(None))):
raise TypeError(
"Expected str or datetime.datetime type for execution_date."
"Got {}".format(type(execution_date))
)
self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
run_id = "trig__{}".format(self.execution_date.isoformat())
elif isinstance(self.execution_date, str):
run_id = "trig__{}".format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime
else:
run_id = "trig__{}".format(timezone.utcnow().isoformat())
dag_id_to_call = self.get_dag_name_f()
# Ignore MyPy type for self.execution_date because it doesn't pick up the timezone.parse() for strings
trigger_dag(
dag_id=dag_id_to_call,
run_id=run_id,
conf=self.conf,
execution_date=self.execution_date,
replace_microseconds=False,
)
args={
'owner': 'arocketman',
'start_date': days_ago(1)
}
dag = DAG(dag_id='dyna_dag', default_args=args, schedule_interval=None)
def your_function():
return 'my_sample_dag'
with dag:
run_this_task = TriggerDagRunWithFuncOperator(
task_id='run_this',
get_dag_name_f=your_function
)