Агрегация Elasticsearch до pandas данных - PullRequest
2 голосов
/ 07 января 2020

Я работаю с некоторыми данными ElasticSearch, и я хотел бы сгенерировать таблицы из агрегатов, как в Kibana. Ниже приведен пример выходных данных агрегирования на основе следующего кода:

    s.aggs.bucket("name1", "terms", field="field1").bucket(
        "name2", "terms", field="innerField1"
    ).bucket("name3", "terms", field="InnerAgg1")
     response = s.execute()
   resp_dict = response.aggregations.name.buckets




{
    "key": "Locationx",
    "doc_count": 12,
    "name2": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "Sub-Loc1",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }, {
            "key": "Sub-Loc2",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }]
    }
}

В этом случае ожидаемый результат будет:

Expected Output

Теперь, я попробовал множество методов, с кратким описанием того, что пошло не так:

Pandasticsearch = полностью потерпел неудачу даже с одним словарём. Словарь не был создан, так как он боролся с ключами, даже если каждый словарь обрабатывался отдельно:

for d in resp_dict :
    x= d.to_dict()
    pandas_df = Select.from_dict(x).to_pandas()
    print(pandas_df)

В частности, была получена ошибка, связанная с тем, что словарь не был создан и, таким образом, ['взял'] не был ключом.

Pandas (pd.Dataframe.from_records ()) = дал мне только первое объединение со столбцом, содержащим внутренний словарь, а использование pd.apply (pd.Series) дало еще одну таблицу результирующие словари.

StackOverflow posts рекурсивная функция = словарь выглядит совершенно иначе, чем в используемом примере, и манипуляция ни к чему не привела, если я радикально не изменил ввод.

1 Ответ

1 голос
/ 22 января 2020

Борясь с той же проблемой, я пришел к выводу, что причина этого в том, что response_dict - это не нормальные диктанты, а elasticsearch_dsl.utils.AttrList из elasticsearch_dsl.utils.AttrDict.

Если у вас есть AttrList из AttrDicts, можно сделать:

resp_dict = response.aggregations.name.buckets
new_response = [i._d_ for i in resp_dict]

Вместо этого получить список обычных диктов. Это, вероятно, будет лучше работать с другими библиотеками.

Редактировать:

Я написал рекурсивную функцию, которая, по крайней мере, обрабатывает некоторые случаи, хотя еще и не была тщательно протестирована и не обернута в хороший модуль или что-то еще. Это всего лишь сценарий. Функция one_lvl отслеживает всех братьев и сестер родителей в дереве в словаре с именем tmp и выполняет рекурсивный поиск при обнаружении нового именованного агрегата. Он предполагает многое о структуре данных, что, я не уверен, оправдано в общем случае.

Необходима вещь lvl, я думаю, потому что у вас могут быть повторяющиеся имена, поэтому key например, существует на нескольких уровнях агрегации.

#!/usr/bin/env python3

from elasticsearch_dsl.query import QueryString
from elasticsearch_dsl import Search, A
from elasticsearch import Elasticsearch
import pandas as pd

PORT = 9250
TIMEOUT = 10000
USR = "someusr"
PW = "somepw"
HOST = "test.com"
INDEX = "my_index"
QUERY = "foobar"

client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)

qs = QueryString(query = QUERY)
s = Search(using=client, index=INDEX).query(qs)

s = s.params(size = 0)

agg= {
    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
    "region" : A("terms", field="region", size=10),
    "county" : A("terms", field="county", size = 10)
}

s.aggs.bucket("dates", agg["dates"]). \
       bucket("region", agg["region"]). \
       bucket("county", agg["county"])

resp = s.execute()

data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
rec_list = ["buckets"] + [*agg.keys()]

def get_fields(i, lvl):
    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}

def one_lvl(data, tmp, lvl, rows, maxlvl):
    tmp = {**tmp, **get_fields(data, lvl)}

    if "buckets" not in data:
        rows.append(tmp)

    for d in data:
        if d in ["buckets"]:
            for v, b in enumerate(data[d]):
                tmp = {**tmp, **get_fields(data[d][v], lvl)}
                for k in b:
                    if k in agg.keys():
                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                    else:
                        if lvl == maxlvl:
                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                            rows.append(tmp)

    return rows


rows = one_lvl(data, {}, 1, [], len(agg))
df = pd.DataFrame(rows)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...