Как определить состояние работоспособности ETL из журналов временных рядов с помощью Elasticsearch? - PullRequest
0 голосов
/ 19 марта 2019

TL; DR: что такое Elasticsearch, эквивалентный этому запросу Postgres?

    SELECT latest_pipeline_logs.* FROM (
      SELECT pipeline_logs.*, 
      rank() OVER (
          PARTITION BY pipeline_name
          ORDER BY updated_at DESC
      )
      FROM pipeline_logs
    ) latest_pipeline_logs WHERE RANK = 1

У меня есть сотни конвейеров ETL с журналами, которые выгружаются в Elasticsearch. Каждый из них выполняется независимо друг от друга с разными интервалами. Я хотел бы получить простое состояние работоспособности для каждого из моих конвейеров ETL, используя агрегации Elasticsearch.

Каждый конвейер регистрирует свое состояние при выполнении. Мой текущий мыслительный процесс заключается в определении работоспособности каждого конвейера на основе двух наиболее важных состояний: succeeded и failed.

Я знаю, что могу сделать запрос агрегации и группировать по каждому конвейеру с субагрегацией для статусов. Например, что-то вроде этого:

{
  ...

  "aggs": {
    "pipelines": {
      "field": "pipeline_name"
    },
    "aggs": {
      "states": {
        "terms": {
          "field": "pipeline_state"
        }
      }
    }
  }
}

Проблема с приведенным выше примером заключается в том, что я мог получить несколько состояний из-за набора данных временных рядов, например, такого:

{
  "key": "some-pipeline-name",
  "buckets": [
    {
      "key": "succeeded",
      "doc_count": 123
    },
    {
      "key": "failed",
      "doc_count": 567
    }
  ]
}

Теоретически я мог бы отфильтровать результаты по дате выполнения конвейера, но поскольку некоторые конвейеры запускаются через месяц или около того, я не думаю, что это вариант.

Конечное состояние - управлять простой приборной панелью, используя набор результатов Elasticsearch, который выглядит примерно так:

[
  {
    "key": "some-pipeline-name",
    "latest-status": "succeeded"
  },
  {
    "key": "some-other-pipeline",
    "latest-status": "failed"
  }
]

Следует отметить, что в этом случае исторические данные не важны. Панель инструментов просто передаст последнее состояние для каждого конвейера.

Как бы вы достигли этого с Elasticsearch?

1 Ответ

1 голос
/ 21 марта 2019

Если вас интересует только последний статус для каждого конвейера, вы можете использовать top_hits в качестве субагрегации, а затем отсортировать по времени

{
  "size": 0,
  "aggs": {
    "pipeline": {
      "terms": {
        "field": "pipeline_name",
        "size": 1000
      },
      "aggs": {
        "top_hits_status": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                "timestamp": {
                  "order": "desc"
                }
              }
            ],
            "_source": {
              "includes": [
                "pipeline_state"
              ]
            }
          }
        }
      }
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...