Оптимизировать функцию, которая генерирует данные из больших тиковых данных - PullRequest
0 голосов
/ 10 июня 2018

У меня есть такой фрейм данных:

df_[['Price', 'Volume', 'Open', 'High', 'Low']]
Out[16]: 
                               Price  Volume  Open   High    Low
datetime                                                        
2016-05-01 22:00:00.334338092  45.90      20  45.9    NaN    NaN
2016-05-01 22:00:00.335312958    NaN       1  45.9    NaN    NaN
2016-05-01 22:00:00.538377726  45.92       1  45.9  45.90  45.90
2016-05-01 22:00:00.590386619  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       3  45.9  45.92  45.90
2016-05-01 22:00:00.590493308  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.591269949  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.591269949  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.591269949  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.707288056  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.719267600  45.92       2  45.9  45.92  45.90
2016-05-01 22:00:00.719267600  45.91       1  45.9  45.92  45.90
2016-05-01 22:00:00.731272008  45.92       1  45.9  45.92  45.90
2016-05-01 22:00:00.731272008  45.91       1  45.9  45.92  45.90
2016-05-01 22:00:00.738358786  45.92       1  45.9  45.92  45.90
(..omitted rows)

Из этого фрейма данных я определил функцию, которая генерирует новый фрейм данных:

res
Out[18]: 
                                High    Low  Open  Price  Volume
datetime                                                        
2016-05-01 22:00:00.334338092    NaN    NaN  45.9  45.90      20
2016-05-01 22:00:00.590493308    NaN    NaN  45.9  45.92      11
2016-05-01 22:00:00.731272008  45.92  45.90  45.9  45.91      10
2016-05-01 22:00:00.759276398  45.92  45.90  45.9  45.92      11
2016-05-01 22:00:00.927307727  45.92  45.90  45.9  45.90      36
2016-05-01 22:00:01.054379713  45.92  45.90  45.9  45.89      10
2016-05-01 22:00:01.251324161  45.92  45.89  45.9  45.92      10
2016-05-01 22:00:03.210540968  45.92  45.89  45.9  45.92      11
2016-05-01 22:00:04.450664460  45.92  45.89  45.9    NaN      10
2016-05-01 22:00:07.426789217  45.92  45.89  45.9  45.93      10
2016-05-01 22:00:10.394898254  45.96  45.89  45.9  45.93      10
2016-05-01 22:00:13.359080034  45.96  45.89  45.9  45.92      11
2016-05-01 22:00:17.434346718  45.96  45.89  45.9  45.92      17
2016-05-01 22:00:21.918598002  45.96  45.89  45.9  45.95      10
2016-05-01 22:00:28.587010136  45.96  45.89  45.9  45.94      10
2016-05-01 22:00:32.103168386  45.96  45.89  45.9  45.93      10
2016-05-01 22:01:04.451829835  45.96  45.89  45.9  45.94      14
2016-05-01 22:01:12.662589219  45.96  45.89  45.9  45.94      10
2016-05-01 22:01:17.823792647  45.96  45.89  45.9  45.94      10
2016-05-01 22:01:22.399158701  45.96  45.89  45.9  45.93      11
2016-05-01 22:01:23.511242124  45.96  45.89  45.9  45.92      10
(..omitted rows)

Эта функция имеет два параметра: df(dataframe)n(size of Volume, for above, n=10).Начиная с первой даты date_1, рассчитайте совокупную сумму объема, затем, если совокупная сумма объема больше или равна n , этот момент равен date_2.Таким образом, этот блок от date_1 до date_2 агрегируется в одну строку следующим образом:

datetime : date_2

Price : price at date_2

Volume : sum of volume from date_1 to date_2

Open : price at date_1

High : max of high from date_1 to date_2

Low : min of low from date-1 to date_2

Do this to end of dataframe. 

Моя проблема в том, что мой входной фрейм данных имеет 60000000 строк.Для агрегирования данных, как указано выше, требуется слишком много времени.Я хочу оптимизировать мой код для функции.Вот мой код:

def tick_to_volume(df, n):
    flag = True
    np_df = np.array(df) #convert to numpy array
    res = pd.DataFrame()
    total_index = 0
    cum_n = 0
    cum_sum = np_df[total_index:,1].cumsum() #cumulative sum of volume
    while(flag):
        cum_n += n
        ix = (cum_sum[total_index:]>=cum_n).argmax() #index when cumulative sum of volume is greater or equal to n
        total_index += ix

        if (ix==0) and (np_df[total_index,4] < n): #for case that all cumulative sum of volume is less than n 
            return res

        cum_n = cum_sum[total_index]                       
        np_df_to_agg = np_df[total_index-ix:(total_index+1), :] #data to be aggregated

        data = {'datetime' : df.index[total_index],
                'Open' : np_df_to_agg[0,2],
                'High' : max(np_df_to_agg[:,3]),
                'Low': min(np_df_to_agg[:,4]),
                'Price' : np_df_to_agg[-1,0],
                'Volume' : sum(np_df_to_agg[:,1])}

        df_to_append = pd.DataFrame([data])
        df_to_append.set_index('datetime', inplace=True)
        res = pd.concat([res, df_to_append])
        total_index += 1

Ответы [ 2 ]

0 голосов
/ 10 июня 2018

Вот частично векторизованный подход.Идея состоит в том, чтобы разбить проблему на две части.

  1. Определить индексы, где каждая группа начинается и заканчивается.
  2. Выполните groupby + agg, используя вашу собственную логику.

Вторая часть проста.Первая часть может быть эффективно выполнена с небольшим трудом + numba.

Итерируем по df.Volume, отслеживая накопленную сумму x.Каждый раз, когда x превышает n, мы помечаем строку для будущего использования и устанавливаем x = 0.После этого у нас есть ряд показателей, показывающих, где заканчивается каждая группа.Немного помассируя и позаботившись о первой / последней группе, мы можем превратить df.Break в серию идентификаторов и перейти к следующему шагу.

import numpy as np
from numba import njit

n = 10


@njit(fastmath=True)
def find_breaks(vols, breaks):
    N = len(vols)
    acc = 0
    for i in range(N):
        acc += vols[i]
        if acc >= n:
            acc = 0
        breaks[i] = acc
    return


# create a blank column to store group ids
df["Break"] = np.nan
# mark points where volumes spill over a threshold
find_breaks(df.Volume.values, df.Break.values)
# populate the ids implied by thresholds
df["Break"] = (df.Break == 0).astype(np.float).replace(0, np.nan).cumsum().bfill()
# handle the last group
df["Break"] = df.Break.fillna(df.Break.max() + 1)

# define an aggregator
aggregator = {
    "Date": "last",
    "Price": "last",
    "Volume": "sum",
    "Open": "first",
    "High": "max",
    "Low": "min",
}

res = df.groupby("Break").agg(aggregator)

# Date  Price  Volume  Open   High   Low
# Break
# 1.0    22:00:00.334338092  45.90      20  45.9    NaN   NaN
# 2.0    22:00:00.590493308  45.92      11  45.9  45.92  45.9
# 3.0    22:00:00.731272008  45.91      10  45.9  45.92  45.9
# 4.0    22:00:00.738358786  45.92       1  45.9  45.92  45.9
0 голосов
/ 10 июня 2018

Повтор append() имеет катастрофические результаты в Pandas и NumPy.Поэтому вместо этого:

res = pd.DataFrame()
while True:
    df_to_append.set_index('datetime', inplace=True)
    res = pd.concat([res, df_to_append])

Сделайте это:

res = []
while True:
    res.append(df_to_append)
res = pd.concat(res)
res.set_index('datetime', inplace=True)

Вы также можете упростить вещи, сохранив data как кортеж вместо dict.Ключи всегда одинаковы, и если вы просто игнорируете их, вы можете заполнить res списком кортежей в цикле, избегая в дальнейшем создания многих временных DataFrames и поиска ключей.

...