Воздушный поток: установить пользовательский run_id для TriggerDagRunOperator - PullRequest
2 голосов
/ 04 октября 2019

При использовании TriggerDagRunOperator для запуска другого DAG, он просто дает общее имя, например, trig_timestamp:

enter image description here

Можно ли присвоить этому идентификатору прогоназначимое имя, чтобы я мог легко идентифицировать различные пробеги даг?

1 Ответ

1 голос
/ 29 октября 2019

Вы не можете немедленно сделать это с TriggerDagOperator, так как "run_id" генерируется внутри его метода execute. Однако вы можете реализовать свой собственный оператор CustomTriggerDagOperator, который будет вести себя так, как вы хотите / нуждаетесь. Например:

class CustomTriggerDagOperator(TriggerDagOperator):
    def execute(self, context):
        if self.execution_date is not None:
            run_id = 'trig__{}'.format(self.execution_date)
            self.execution_date = timezone.parse(self.execution_date)
        else:
            run_id = 'trig__' + timezone.utcnow().isoformat()

        run_id += f'{self.trigger_dag_id}'

        dro = DagRunOrder(run_id=run_id)
        if self.python_callable is not None:
            dro = self.python_callable(context, dro)
        if dro:
            trigger_dag(dag_id=self.trigger_dag_id,
                        run_id=dro.run_id,
                        conf=json.dumps(dro.payload),
                        execution_date=self.execution_date,
                        replace_microseconds=False)
        else:
            self.log.info("Criteria not met, moving on")

В этом примере выше просто добавляется id сработавшего символа dag. Вы можете использовать эту же стратегию для произвольной установки run_id.

...