У меня есть база данных Elasticsearch, и в ней много данных временных рядов.Я хотел бы загружать эти данные в память пакетами, помещать их в Dataframe, выполнять некоторые операции, а затем сокращать эти данные с помощью будущих Dataframes.
time_ranges = [(t1, t2), {t2, t3), (t3, t4)]
data_chunk0 = elastic_query(time_ranges[0])
data_chunk1 = elastic_query(time_ranges[1])
data_chunk2 = elastic_query(time_ranges[2])
(где эластичный запрос - это просто функция, которая выполняет запрос по временному диапазону и возвращает кадр данных, для некоторых элементов, где каждый элемент имеет несколько столбцов данных)
items | col1 | col2 | col3|
item 1| 10 | True | 5 |
item 2| 11 | True | 9 |
item 1| 90 | False | 5 |
Я хотел бы сделать следующее: для data_chunk0 взять максимум столбцов ... затем для data_chunk1 взять максимум столбцов ... объединить эти два результата и взять максимум из них.Затем возьмите этот результат и сравните с data_chunk2 и снова получите максимальные данные.Вот как это будет выглядеть в следующей последовательности:
result = pd.concat([data_chunk0.groupby('items').max(), data_chunk1.groupby('items').max()]).groupby('items).max()
pd.concat([result, data_chunk2.groupby('items').max()]).groupby('items).max()
Однако я не хочу загружать данные в память для этого.Я хотел бы сделать это постепенно, пара за один раз.Я могу сделать это, используя метод Reduction в Python, вот так:
def reduce_ranges(range1, range2):
#here, load data for range1 and range 2 from Elastic
return pd.concat([range1.groupby('items').max(), range2.groupby('items').max()], ignore_index=True).groupby('items').max().reset_index()
result = reduce((lambda x, y: reduce_ranges(x, y)), time_ranges)
Но тогда проблема в том, что мне действительно нужно добавить столбец в функцию сокращения, и я не хочу обрабатыватьпервый диапазон дважды (потому что для всех пар фреймов данных, кроме первого, первый диапазон уже будет обработан предыдущим шагом уменьшения.)
Существует ли более элегантный способ сделать это, не используя forloop?
Я посмотрел в «карту», а затем «уменьшить», но, похоже, мне нужно было бы загрузить все свои данные в список для обработки функции карты, прежде чем сокращать.
Итак, как бы мне:
Учитывая список временных диапазонов, загрузить данные для первого временного диапазона ... найти максимальное значение для столбцов .... затем загрузить данные для второго разадиапазон ... объединить с MAX RESULT первого ... вернуть новый максимальный фрейм данных ... затем загрузить данные для третьего временного диапазона и т. д.
Я знаю, что могу решить эту проблему, используя для циклов, ноМне интересно, есть ли лучший, более приемлемый путь ... В идеале, который мог бы обеспечить параллелизм в будущем.