Чтобы отслеживать затраты GCP с использованием меток , хотелось бы расширить BigQueryExecuteQueryOperator некоторыми дополнительными метками, чтобы каждый экземпляр задачи автоматически получал эти метки в своем конструкторе.
class ExtendedBigQueryExecuteQueryOperator(BigQueryExecuteQueryOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs) -> None:
task_labels = {
'dag_id': '{{ dag.dag_id }}',
'task_id': kwargs.get('task_id'),
'ds': '{{ ds }}',
# ugly, all three params got in diff. ways
}
super().__init__(*args, **kwargs)
if self.labels is None:
self.labels = task_labels
else:
self.labels.update(task_labels)
with DAG(dag_id=...,
start_date=...,
schedule_interval=...,
default_args=...) as dag:
t1 = ExtendedBigQueryExecuteQueryOperator(
task_id=f't1',
sql=f'SELECT 1;',
labels={'some_additional_label2':'some_additional_label2'}
# all labels should be: dag_id, task_id, ds, some_additional_label2
)
t2 = ExtendedBigQueryExecuteQueryOperator(
task_id=f't2',
sql=f'SELECT 2;',
labels={'some_additional_label3':'some_additional_label3'}
# all labels should be: dag_id, task_id, ds, some_additional_label3
)
t1 >> t2
но тогда я теряю метки уровня задач some_additional_label2
или some_additional_label3
.