Расширьте BigQueryExecuteQueryOperator дополнительными метками, используя jinja2 - PullRequest
0 голосов
/ 23 февраля 2020

Чтобы отслеживать затраты GCP с использованием меток , хотелось бы расширить BigQueryExecuteQueryOperator некоторыми дополнительными метками, чтобы каждый экземпляр задачи автоматически получал эти метки в своем конструкторе.

class ExtendedBigQueryExecuteQueryOperator(BigQueryExecuteQueryOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs) -> None:
        task_labels = {
            'dag_id': '{{ dag.dag_id }}',
            'task_id': kwargs.get('task_id'),
            'ds': '{{ ds }}',
            # ugly, all three params got in diff. ways
        }
        super().__init__(*args, **kwargs)
        if self.labels is None:
            self.labels = task_labels
        else:
            self.labels.update(task_labels)

with DAG(dag_id=...,
         start_date=...,
         schedule_interval=...,
         default_args=...) as dag:

    t1 = ExtendedBigQueryExecuteQueryOperator(
        task_id=f't1',
        sql=f'SELECT 1;',
        labels={'some_additional_label2':'some_additional_label2'}
        # all labels should be: dag_id, task_id, ds, some_additional_label2
    )

    t2 = ExtendedBigQueryExecuteQueryOperator(
        task_id=f't2',
        sql=f'SELECT 2;',
        labels={'some_additional_label3':'some_additional_label3'}
        # all labels should be: dag_id, task_id, ds, some_additional_label3
    )

    t1 >> t2

но тогда я теряю метки уровня задач some_additional_label2 или some_additional_label3.

1 Ответ

1 голос
/ 24 февраля 2020

Вы можете создать следующую политику в airflow_local_settings.py:

def policy(task):
    if task.__class__.__name__ == "BigQueryExecuteQueryOperator":
        task.labels.update({'dag_id': task.dag_id, 'task_id': task.task_id})

Из документов:

Ваш локальный поток воздуха Файл настроек может определять функцию политики, которая может изменять атрибуты задачи на основе других атрибутов задачи или группы обеспечения доступности баз данных. Он получает один аргумент в качестве ссылки на объекты задачи и, как ожидается, изменит свои атрибуты.

Подробнее о применении политики: https://airflow.readthedocs.io/en/1.10.9/concepts.html#cluster -policy

В этом случае вам не нужно расширять BigQueryExecuteQueryOperator. Единственная пропущенная часть - это дата исполнения , которую вы можете установить в самой задаче.

Пример:

with DAG(dag_id=...,
         start_date=...,
         schedule_interval=...,
         default_args=...) as dag:

    t1 = BigQueryExecuteQueryOperator(
        task_id=f't1',
        sql=f'SELECT 1;',
        lables={'some_additional_label2':'some_additional_label2', 'ds': '{{ ds }}'}
    )

airflow_local_settings файл должен быть в вашем PYTHONPATH . Вы можете поместить его в $AIRFLOW_HOME/config или в свой каталог dags.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...