Преобразование данных запаса OHLC в другой таймфрейм с помощью python и pandas - PullRequest
15 голосов
/ 30 марта 2012

Может ли кто-нибудь указать мне правильное направление в отношении преобразования таймфрейма данных OHLC с помощью Pandas ?Я пытаюсь создать Dataframe с данными для более высоких таймфреймов, учитывая данные с меньшими таймфреймами.

Например, если у меня есть следующие данные за одну минуту (M1):

                       Open    High     Low   Close  Volume
Date                                                       
1999-01-04 10:22:00  1.1801  1.1819  1.1801  1.1817       4
1999-01-04 10:23:00  1.1817  1.1818  1.1804  1.1814      18
1999-01-04 10:24:00  1.1817  1.1817  1.1802  1.1806      12
1999-01-04 10:25:00  1.1807  1.1815  1.1795  1.1808      26
1999-01-04 10:26:00  1.1803  1.1806  1.1790  1.1806       4
1999-01-04 10:27:00  1.1801  1.1801  1.1779  1.1786      23
1999-01-04 10:28:00  1.1795  1.1801  1.1776  1.1788      28
1999-01-04 10:29:00  1.1793  1.1795  1.1782  1.1789      10
1999-01-04 10:31:00  1.1780  1.1792  1.1776  1.1792      12
1999-01-04 10:32:00  1.1788  1.1792  1.1788  1.1791       4

, которые имеют значения Open, High, Low, Close (OHLC) и объема для каждой минуты Iхотел бы построить набор 5-минутных чтений (M5), который будет выглядеть так:

                       Open    High     Low   Close  Volume
Date                                                       
1999-01-04 10:25:00  1.1807  1.1815  1.1776  1.1789      91
1999-01-04 10:30:00  1.1780  1.1792  1.1776  1.1791      16

Итак, рабочий процесс таков:

  • Open - это Open of theпервая строка во временном окне
  • Максимум - самый высокий Максимум в временном окне
  • Низкий - самый низкий Минимум
  • Закрытие - это последнее Закрытие
  • Громкость простосумма томов

Существует несколько проблем:

  • в данных есть пробелы (обратите внимание, что в строке 10:30:00 нет)
  • 5-минутные интервалы должны начинаться во время раунда, например, M5 начинается в 10:25:00, а не в 10: 22: 00
  • ; во-первых, неполный набор может быть опущен, как в этом примере, или включен (поэтому мыможет иметь 10:20:00 5-минутный ввод)

В документации Pandas по выборке вверх-вниз приводится пример, но они используют среднее значение as значение строки с передискретизацией, которая здесь не сработает.Я пытался использовать groupby и agg, но безрезультатно.Для того, чтобы получить наивысший максимум и наименьший минимум, может быть не так сложно, но я понятия не имею, как получить первый Open и последний Close.

То, что я пробовал, выглядит примерно так:

grouped = slice.groupby( dr5minute.asof ).agg( 
    { 'Low': lambda x : x.min()[ 'Low' ], 'High': lambda x : x.max()[ 'High' ] } 
)

но это приводит к следующей ошибке, которую я не понимаю:

In [27]: grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <module>()
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in agg(self, func, *args, **kwargs)
    242         See docstring for aggregate
    243         """
--> 244         return self.aggregate(func, *args, **kwargs)
    245 
    246     def _iterate_slices(self):

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, arg, *args, **kwargs)
   1153                     colg = SeriesGroupBy(obj[col], column=col,
   1154                                          grouper=self.grouper)
-> 1155                     result[col] = colg.aggregate(func)
   1156 
   1157             result = DataFrame(result)

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, func_or_funcs, *args, **kwargs)
    906                 return self._python_agg_general(func_or_funcs, *args, **kwargs)
    907             except Exception:
--> 908                 result = self._aggregate_named(func_or_funcs, *args, **kwargs)
    909 
    910             index = Index(sorted(result), name=self.grouper.names[0])

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in _aggregate_named(self, func, *args, **kwargs)
    976             grp = self.get_group(name)
    977             grp.name = name
--> 978             output = func(grp, *args, **kwargs)
    979             if isinstance(output, np.ndarray):
    980                 raise Exception('Must produce aggregated value')

/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <lambda>(x)
----> 1 grouped = slice.groupby( dr5minute.asof ).agg( { 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] } )

IndexError: invalid index to scalar variable.

Так что любая помощь по этому вопросу будет принята с благодарностью.Если выбранный путь не сработает, предложите другой относительно эффективный подход (у меня миллионы строк).Некоторые ресурсы по использованию Pandas для финансовой обработки также подойдут.

Ответы [ 4 ]

14 голосов
/ 07 апреля 2016

просто для того, чтобы помочь другим пользователям с более свежей версией Pandas, есть метод resample , очень быстрый и полезный для выполнения той же задачи:

ohlc_dict = {                                                                                                             
'Open':'first',                                                                                                    
'High':'max',                                                                                                       
'Low':'min',                                                                                                        
'Close': 'last',                                                                                                    
'Volume': 'sum'
}

df.resample('5T', how=ohlc_dict, closed='left', label='left')
12 голосов
/ 02 апреля 2012

Ваш подход обоснован, но терпит неудачу, потому что каждая функция в dict-of-functions применяется к agg () получает объект Series, отражающий столбец, соответствующий значению ключа. Поэтому нет необходимости Снова отфильтруйте метку столбца. С этим, и предполагая, что groupby сохраняет порядок, Вы можете нарезать серию, чтобы извлечь первый / последний элемент Open / Close столбцы (примечание: групповая документация не претендует на сохранение порядка исходных данных серия, но, кажется, на практике.)

In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(), 
                                         'High': lambda s: s.max(),
                                         'Open': lambda s: s[0],
                                         'Close': lambda s: s[-1],
                                         'Volume': lambda s: s.sum()})
Out[50]: 
                      Close    High     Low    Open  Volume
key_0                                                      
1999-01-04 10:20:00  1.1806  1.1819  1.1801  1.1801      34
1999-01-04 10:25:00  1.1789  1.1815  1.1776  1.1807      91
1999-01-04 10:30:00  1.1791  1.1792  1.1776  1.1780      16

Для справки, вот таблица для подведения итогов ожидаемого типы ввода и вывода функции агрегирования на основе типа объекта groupby и того, как функции (функции) агрегации передаются в agg ().

                  agg() method     agg func    agg func          agg()
                  input type       accepts     returns           result
GroupBy Object
SeriesGroupBy     function         Series      value             Series
                  dict-of-funcs    Series      value             DataFrame, columns match dict keys
                  list-of-funcs    Series      value             DataFrame, columns match func names
DataFrameGroupBy  function         DataFrame   Series/dict/ary   DataFrame, columns match original DataFrame
                  dict-of-funcs    Series      value             DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame
                  list-of-funcs    Series      value             DataFrame, MultiIndex columns (original cols x func names)

Из таблицы выше, если для агрегации требуется доступ более чем к одному столбец, единственный вариант - передать одну функцию Объект DataFrameGroupBy. Поэтому альтернативный способ выполнить исходную задачу состоит в том, чтобы определить функция, подобная следующей:

def ohlcsum(df):
    df = df.sort()
    return {
       'Open': df['Open'][0],
       'High': df['High'].max(),
       'Low': df['Low'].min(),
       'Close': df['Close'][-1],
       'Volume': df['Volume'].sum()
      }

и примените к нему agg ():

In [30]: df.groupby(dr5minute.asof).agg(ohlcsum)
Out[30]: 
                       Open    High     Low   Close  Volume
key_0                                                      
1999-01-04 10:20:00  1.1801  1.1819  1.1801  1.1806      34
1999-01-04 10:25:00  1.1807  1.1815  1.1776  1.1789      91
1999-01-04 10:30:00  1.1780  1.1792  1.1776  1.1791      16

Хотя панды могут предложить более чистую встроенную магию в будущем, надеюсь, это объясняет, как работать с современными возможностями agg ().

0 голосов
/ 21 января 2019
bitfinex_klines_pd = bitfinex_klines_pd.resample('4h').agg({
    'open': lambda s: s[0],
    'high': lambda df: df.max(),
    'low': lambda df: df.min(),
    'close': lambda df: df[-1],
    'volume': lambda df: df.sum()
})
0 голосов
/ 13 июня 2017

В моей функции main () я получаю потоковые данные bid / ask.Затем я делаю следующее:

df = pd.DataFrame([])

for msg_type, msg in response.parts():
    if msg_type == "pricing.Price":
        sd = StreamingData(datetime.now(),instrument_string(msg),
                           mid_string(msg),account_api,account_id,
                           's','5min',balance)
        df = df.append(sd.df())
        sd.resample(df)

Я создал класс StreamingData () , который принимает предоставленный вход (также создал некоторые функции для разбиения данных типа bid / ask на отдельные компоненты (Bid, Ask, Mid, инструмент и т. д.).

Прелесть этого всего, что вам нужно сделать, это изменить 's' и '5min' на любые временные рамки, которые вы хотите. Установите «m» и «D», чтобы получать ежедневные цены за минуту.

Вот как выглядит моя StreamingData () :

class StreamingData(object):
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance):
    self.time = time
    self.instrument = instrument
    self.mid = mid
    self.api = api
    self._id = _id
    self.xsec = xsec
    self.xmin = xmin
    self.balance = balance
    self.data = self.resample(self.df())

def df(self):
    df1 = pd.DataFrame({'Time':[self.time]})
    df2 = pd.DataFrame({'Mid':[float(self.mid)]})
    df3 = pd.concat([df1,df2],axis=1,join='inner')
    df = df3.set_index(['Time'])
    df.index = pd.to_datetime(df.index,unit='s')
    return df

def resample(self, df):
    xx = df.to_period(freq=self.xsec)
    openCol = xx.resample(self.xmin).first()
    highCol = xx.resample(self.xmin).max()
    lowCol = xx.resample(self.xmin).min()
    closeCol = xx.resample(self.xmin).last()
    self.data = pd.concat([openCol,highCol,lowCol,closeCol],
                           axis=1,join='inner')
    self.data['Open'] = openCol.round(5)
    self.data['High'] = highCol.round(5)
    self.data['Low'] = lowCol.round(5)
    self.data['Close'] = closeCol.round(5)
    return self.data

Таким образом, он получает данные из StreamingData () , создает индексированный по времени фрейм данных в df () , добавляет его, затем отправляет через resample () . Цены, которые я рассчитываю, основаны на: mid = (bid + ask) / 2

...