Вы не можете немедленно сделать это с TriggerDagOperator
, так как "run_id" генерируется внутри его метода execute. Однако вы можете реализовать свой собственный оператор CustomTriggerDagOperator
, который будет вести себя так, как вы хотите / нуждаетесь. Например:
class CustomTriggerDagOperator(TriggerDagOperator):
def execute(self, context):
if self.execution_date is not None:
run_id = 'trig__{}'.format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date)
else:
run_id = 'trig__' + timezone.utcnow().isoformat()
run_id += f'{self.trigger_dag_id}'
dro = DagRunOrder(run_id=run_id)
if self.python_callable is not None:
dro = self.python_callable(context, dro)
if dro:
trigger_dag(dag_id=self.trigger_dag_id,
run_id=dro.run_id,
conf=json.dumps(dro.payload),
execution_date=self.execution_date,
replace_microseconds=False)
else:
self.log.info("Criteria not met, moving on")
В этом примере выше просто добавляется id сработавшего символа dag. Вы можете использовать эту же стратегию для произвольной установки run_id.