Я работаю с огромным набором данных с примерно 450 000 наблюдений в день, и я был бы признателен за некоторые сведения о том, как я могу улучшить время выполнения приведенного ниже кода.
Я хочу рассчитать интенсивность поступления 6 разных групп в течение 90 секунд в течение дня. Интенсивность прибытия определяется как количество наблюдений, принадлежащих одной конкретной группе, деленное на общее количество наблюдений за последние 90 секунд. Проблема в том, что количество наблюдений за последние 90 секунд довольно сильно меняется в течение дня. Я построил пример, чтобы проиллюстрировать мою текущую лучшую реализацию. Приведенный ниже набор данных состоит из метки для типа наблюдения npData
и дополнительного времени, в которое оно произошло, npDeltaT
. Инкрементное время следует интерпретировать как время, прошедшее с момента получения предыдущего наблюдения, а кумулятивное время составляет весь день.
import numpy as np
import time
start = time.time()
# The size of the window to calculate the intensities over
T = 90
# The data
np.random.seed(2020)
npDeltaT = np.random.uniform(0,0.004,size=450000)
# Calculating the cumulative delta time
tIncreAccu = np.cumsum(npDeltaT)
#### For running intensity calculation
# Weighted
npData = np.random.choice(np.arange(0,6),size=450000,p=[0.1,0.3,0.2,0.05,0.3,0.05])
В настоящее время я имею дело с изменяющимися размерами последних 90 секунд. построить блок данных, содержащий все наблюдения за последние 90 секунд, итеративно go по каждому наблюдению, и обновить блок данных, чтобы он всегда отражал последние 90 секунд. Таким образом, мне нужно только go по всем данным один раз, а размер подмножества данных, по которому производится поиск, меньше, чем весь набор данных.
# Locating the first element which is above the treshold
firstOverT = np.where(tIncreAccu > T)[0][0]
# Creating a block to calculate on
tAccuBlock = tIncreAccu[0:firstOverT]
dataBlock = npData[0:firstOverT]
# Initialising and calculating the first set of intensities
intensities = [[len(dataBlock[dataBlock == i]) / len(dataBlock)] for i in np.arange(6)]
# Going through each timestep.
for i in np.arange(firstOverT,len(tIncreAccu)):
# Adding the new observation.
tAccuBlock = np.append(tAccuBlock,tIncreAccu[i])
dataBlock = np.append(dataBlock,npData[i])
# Locating the indices to drop at this step.
indicesToKeep = np.where(tAccuBlock>=(tIncreAccu[i]-T))[0]
# Removing irrelevant observations.
tAccuBlock = tAccuBlock[indicesToKeep]
dataBlock = dataBlock[indicesToKeep]
intenList = [len(dataBlock[dataBlock == i]) / len(dataBlock) for i in np.arange(6)]
# Adding the new intensities
intensities[0] += [intenList[0]]
intensities[1] += [intenList[1]]
intensities[2] += [intenList[2]]
intensities[3] += [intenList[3]]
intensities[4] += [intenList[4]]
intensities[5] += [intenList[5]]
end = time.time()
print('The processing took %.3f seconds.' % (end-start))
Я пробовал множество разных вещей, чтобы улучшить время выполнения, чтобы сохранить интенсивности в матрице вместо 6 отдельных списков, чтобы минимизировать количество назначений - без удачи. Приведенное выше решение является самым быстрым, которое мне удалось написать на данный момент, и для обработки всего набора данных все еще требуется около 700 секунд.
Предложение по улучшению моего собственного
Я подумал, что с помощью представление разреженной матрицы, хранящее соответствующие индексы в строках и столбцах, представляющих каждый временной шаг, может улучшить время выполнения, но очень медленно назначать индексы для каждого временного шага для разреженной матрицы. Мое предложение перечислено ниже.
# Sparse-matrix-implementation
start = time.time()
# The size of the window to calculate the intensities over
T = 1
# The data
np.random.seed(2020)
npDeltaT = np.random.uniform(0,0.004,size=450000)
# Calculating the cumulative delta time
tIncreAccu = np.cumsum(npDeltaT)
firstOverT = np.where(tIncreAccu > T)[0][0]
first = (tIncreAccu < tIncreAccu[firstOverT - 1]) & (tIncreAccu >= (tIncreAccu[firstOverT - 1]-T))
sparseMatWhere = sparse.csc_matrix(first).T
for i in np.arange(firstOverT,len(tIncreAccu)):#len(tIncreAccu)
sparseMatWhere = sparse.hstack([sparseMatWhere,
sparse.csc_matrix(np.array((tIncreAccu < tIncreAccu[i]) & (tIncreAccu >= (tIncreAccu[i]-T)))).T])
print(sparseMatWhere.shape)
end = time.time()
print('The processing took %.3f seconds.' % (end-start))
Я с удовольствием уточню некоторые вещи, которые я попробовал завтра, при необходимости.
Заранее, спасибо.