TL; DR: что такое Elasticsearch, эквивалентный этому запросу Postgres?
SELECT latest_pipeline_logs.* FROM (
SELECT pipeline_logs.*,
rank() OVER (
PARTITION BY pipeline_name
ORDER BY updated_at DESC
)
FROM pipeline_logs
) latest_pipeline_logs WHERE RANK = 1
У меня есть сотни конвейеров ETL с журналами, которые выгружаются в Elasticsearch. Каждый из них выполняется независимо друг от друга с разными интервалами. Я хотел бы получить простое состояние работоспособности для каждого из моих конвейеров ETL, используя агрегации Elasticsearch.
Каждый конвейер регистрирует свое состояние при выполнении. Мой текущий мыслительный процесс заключается в определении работоспособности каждого конвейера на основе двух наиболее важных состояний: succeeded
и failed
.
Я знаю, что могу сделать запрос агрегации и группировать по каждому конвейеру с субагрегацией для статусов. Например, что-то вроде этого:
{
...
"aggs": {
"pipelines": {
"field": "pipeline_name"
},
"aggs": {
"states": {
"terms": {
"field": "pipeline_state"
}
}
}
}
}
Проблема с приведенным выше примером заключается в том, что я мог получить несколько состояний из-за набора данных временных рядов, например, такого:
{
"key": "some-pipeline-name",
"buckets": [
{
"key": "succeeded",
"doc_count": 123
},
{
"key": "failed",
"doc_count": 567
}
]
}
Теоретически я мог бы отфильтровать результаты по дате выполнения конвейера, но поскольку некоторые конвейеры запускаются через месяц или около того, я не думаю, что это вариант.
Конечное состояние - управлять простой приборной панелью, используя набор результатов Elasticsearch, который выглядит примерно так:
[
{
"key": "some-pipeline-name",
"latest-status": "succeeded"
},
{
"key": "some-other-pipeline",
"latest-status": "failed"
}
]
Следует отметить, что в этом случае исторические данные не важны. Панель инструментов просто передаст последнее состояние для каждого конвейера.
Как бы вы достигли этого с Elasticsearch?