Воздушный поток определяют on_success_callback в базовом операторе - PullRequest
0 голосов
/ 05 мая 2019

Я хотел бы создать свой собственный пользовательский базовый оператор с дополнительной функцией отправки метрик в стекдрайвер при успешном / неудачном / повторном запуске

Я создал класс для мониторинга

class Monitoring
    def __init__(self,
             metrics,*args, **kwargs):
        self.metrics = builtin_metrics + metrics if metrics is not None else builtin_metrics
    def send_metric(self,status, context):
       CODE TO SEND TO STACKDRIVER
    def _on_success_callback(self,context):
       self.send_metric('SUCCESS, context)

class CustomBaseOperator(Monitoring, BaseOperator):
      def __init__(self,  *args, **kwargs):
         Monitoring.__init__(self, *args, **kwargs)
         BaseOperator.__init__(self,
                          on_success_callback=self._on_success_callback
                          *args, **kwargs)

Я разработал его таким образом, чтобы все функции мониторинга обрабатывались в его собственном классе, и если мы хотим добавить дополнительные функции к базовому оператору, это можно сделать с помощью наследования от дополнительных классов

Это хорошо работает в Python, нов воздушном потоке это иногда работает, но когда я перезагружаю сервер (локально), обычно происходит сбой:

File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", line 1335, in recurse_nodes
visited.add(task)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
File "/Users/nirben/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models.py", line 2517, in __hash__
hash(val)
[Previous line repeated 314 more times]
RecursionError: maximum recursion depth exceeded

Мне нужны методы on_success_callback экземпляра, поскольку они регистрируют данные, специфичные для данной задачи./ dag

Существуют ли какие-либо ограничения со стороны воздушного потока, которые препятствуют этому?

...