pandas.HDFS скопление агрегации с чанками - PullRequest
0 голосов
/ 23 декабря 2018

Как выбрать и объединить данные из pandas.HDFStore, разделенного на части по индексу (DatetimeIndex)?

Я хотел бы применить что-то вроде

data = data.drop_duplicates()    
data = data.groupby(pd.Grouper(freq='AS')).agg('mean')

, но разбивая данные на части.

Данные содержат дублированные индексы и не сортируются.Тем не менее, я не возражаю против сортировки, если вы можете сделать это частями.

Спасибо за вашу помощь!

Идеи:

  • for data in store.select('orig', chunksize=3): допускает только фиксированный размер фрагмента.

  • store.select('orig', where='index>="{}" & index<"{}"'.format('2017-01-01', '2018-01-01')) Может работать, но тогда не знает, как получить минимальное и максимальное значения индекса.

Воспроизведение

Сначала мы создаем DataFrame с некоторыми дублированными индексами и сохраняем его в файле HDF5.

import numpy as np
import pandas as pd
file = 'tmp.h5'
data = pd.DataFrame(np.random.rand(20, 2),
                    index=pd.to_datetime(['2016-01-01 00:00:00', '2016-03-01 00:00:00',
                                          '2017-01-01 00:00:00', '2017-03-01 00:00:00',
                                          '2016-01-01 00:00:00', '2016-02-01 00:00:00',
                                          '2018-01-01 00:00:00', '2015-03-01 00:00:00',
                                          '2018-02-01 00:00:00', '2018-01-01 00:00:00',
                                          '2017-01-01 00:00:00', '2018-06-01 00:00:00',
                                          '2016-02-01 00:00:00', '2018-07-01 00:00:00',
                                          '2015-01-01 00:00:00', '2017-04-01 00:00:00',
                                          '2017-01-01 00:00:00', '2016-03-01 00:00:00',
                                          '2018-01-01 00:00:00', '2016-02-01 00:00:00']),
                    columns=['A', 'B'])


with pd.HDFStore(file, complevel=9, complib='zlib',
                 fletcher32=True, mode='w') as store:
    store.put('orig', df, format='table', data_columns=True)

В данный момент я просто читаю во всехданные и обработка одновременно (однако я сейчас имею дело с MemoryErrors):

with pd.HDFStore(file) as store:
    expected = store.select('orig')

    # Removing duplicated entries
    expected = expected.drop_duplicates()  

    # Aggregate by year
    expected = expected.groupby(pd.Grouper(freq='AS')).agg('mean')

Могу ли я сделать то же самое по частям?(Важно то, что все записи с одинаковой частотой агрегации находятся в одном и том же фрагменте, в противном случае происходит сбой, как здесь):

with pd.HDFStore(file) as store:
    for data in store.select('orig', chunksize=3):
        # Removing duplicated entries
        data = data.drop_duplicates() 

        # Aggregate by year
        data = data.groupby(pd.Grouper(freq='AS')).agg('mean')
        store.append('agg', data)

    result = store.select('agg')

pd.testing.assert_frame_equal(expected, result)
> AssertionError
...