prometheus / client_ python: Как назначить новый реестр без перезагрузки? - PullRequest
1 голос
/ 20 июня 2020
• 1000 предыдущие методы тестирования, например:
def test_grouped_codes():
    app = create_app()
    instrument(app)
    # test stuff

Но я не могу «сбросить» реестр, поэтому все время получаю сообщение об ошибке «Дублированные таймсерии в CollectorRegistry».

Как я могу сбросить реестр (или установить его на пустой реестр) клиентской библиотеки Prometheus Python во время выполнения?

Среди прочего, я пробовал следующее, но оно работает не работает:

def create_app():
    app = Flask(__name__)
    registry = CollectorRegistry()  # Create new registry.
    prometheus_client.REGISTRY = registry  # Try to override global registry.
    prometheus_client.registry.REGISTRY = registry  # Try to override global registry.

    @app.route("/")
    def home():
        return "Hello World!"

    # More functions ...

    @app.route("/metrics")
    @FlaskInstrumentator.do_not_track()
    def metrics():
        data = generate_latest(registry)
        headers = {
            "Content-Type": CONTENT_TYPE_LATEST,
            "Content-Length": str(len(data))}
        return data, 200, headers

    return app

Я обнаружил следующее qa при переполнении стека здесь . @ brian-brazil рекомендует объявлять метрики на уровне модуля, но тогда мне пришлось бы жестко закодировать имена меток, которых я хотел бы избежать. Некоторые используют handler, другие method или path, поэтому я хочу оставить его настраиваемым.

1 Ответ

1 голос
/ 20 июня 2020

Хорошо, благодаря подсказке в этом ответе @Xitrum «метод отмены регистрации» я нашел решение:

collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
    REGISTRY.unregister(collector)

Теперь все тесты могут начинаться с их собственного реестра:

def test_metrics_endpoint_availability():
    app = create_app()
    FlaskInstrumentator(app).instrument()
    client = app.test_client()

    response = client.get("/")
    response = client.get("/metrics")
    
    # Test stuff
    

def test_grouped_status_codes():
    app = create_app()
    FlaskInstrumentator(app).instrument()
    client = app.test_client()

    client.get("/does_not_exist")  # Should be ignored.
    client.get("/does_not_exist")  # Should be ignored.
    client.post("/")
    client.post("/")
    
    # Test stuff
...