Я хотел бы создать свой собственный пользовательский базовый оператор с дополнительной функцией отправки метрик в стекдрайвер при успешном / неудачном / повторном запуске
Я создал класс для мониторинга
class Monitoring
def __init__(self,
metrics,*args, **kwargs):
self.metrics = builtin_metrics + metrics if metrics is not None else builtin_metrics
def send_metric(self,status, context):
CODE TO SEND TO STACKDRIVER
def _on_success_callback(self,context):
self.send_metric('SUCCESS, context)
class CustomBaseOperator(Monitoring, BaseOperator):
def __init__(self, *args, **kwargs):
Monitoring.__init__(self, *args, **kwargs)
BaseOperator.__init__(self,
on_success_callback=self._on_success_callback
*args, **kwargs)
Я разработал его таким образом, чтобы все функции мониторинга обрабатывались в его собственном классе, и если мы хотим добавить дополнительные функции к базовому оператору, это можно сделать с помощью наследования от дополнительных классов
Это хорошо работает в Python, нов воздушном потоке это иногда работает, но когда я перезагружаю сервер (локально), обычно происходит сбой:
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", line 1335, in recurse_nodes
visited.add(task)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
[Previous line repeated 314 more times]
RecursionError: maximum recursion depth exceeded
Мне нужны методы on_success_callback экземпляра, поскольку они регистрируют данные, специфичные для данной задачи./ dag
Существуют ли какие-либо ограничения со стороны воздушного потока, которые препятствуют этому?