Работа с подмножествами данных разных размеров - предложения по повышению эффективности приветствуются - PullRequest
0 голосов
/ 16 февраля 2020

Я работаю с огромным набором данных с примерно 450 000 наблюдений в день, и я был бы признателен за некоторые сведения о том, как я могу улучшить время выполнения приведенного ниже кода.

Я хочу рассчитать интенсивность поступления 6 разных групп в течение 90 секунд в течение дня. Интенсивность прибытия определяется как количество наблюдений, принадлежащих одной конкретной группе, деленное на общее количество наблюдений за последние 90 секунд. Проблема в том, что количество наблюдений за последние 90 секунд довольно сильно меняется в течение дня. Я построил пример, чтобы проиллюстрировать мою текущую лучшую реализацию. Приведенный ниже набор данных состоит из метки для типа наблюдения npData и дополнительного времени, в которое оно произошло, npDeltaT. Инкрементное время следует интерпретировать как время, прошедшее с момента получения предыдущего наблюдения, а кумулятивное время составляет весь день.

import numpy as np
import time

start = time.time()

# The size of the window to calculate the intensities over
T = 90

# The data
np.random.seed(2020)
npDeltaT = np.random.uniform(0,0.004,size=450000)
# Calculating the cumulative delta time
tIncreAccu = np.cumsum(npDeltaT)

#### For running intensity calculation
# Weighted
npData = np.random.choice(np.arange(0,6),size=450000,p=[0.1,0.3,0.2,0.05,0.3,0.05])

В настоящее время я имею дело с изменяющимися размерами последних 90 секунд. построить блок данных, содержащий все наблюдения за последние 90 секунд, итеративно go по каждому наблюдению, и обновить блок данных, чтобы он всегда отражал последние 90 секунд. Таким образом, мне нужно только go по всем данным один раз, а размер подмножества данных, по которому производится поиск, меньше, чем весь набор данных.

# Locating the first element which is above the treshold
firstOverT = np.where(tIncreAccu > T)[0][0]

# Creating a block to calculate on
tAccuBlock = tIncreAccu[0:firstOverT]
dataBlock = npData[0:firstOverT]

# Initialising and calculating the first set of intensities
intensities = [[len(dataBlock[dataBlock == i]) / len(dataBlock)] for i in np.arange(6)]

# Going through each timestep.
for i in np.arange(firstOverT,len(tIncreAccu)):

  # Adding the new observation.
  tAccuBlock = np.append(tAccuBlock,tIncreAccu[i])
  dataBlock = np.append(dataBlock,npData[i])

  # Locating the indices to drop at this step.
  indicesToKeep = np.where(tAccuBlock>=(tIncreAccu[i]-T))[0]
  # Removing irrelevant observations.
  tAccuBlock = tAccuBlock[indicesToKeep]
  dataBlock = dataBlock[indicesToKeep]

  intenList = [len(dataBlock[dataBlock == i]) / len(dataBlock) for i in np.arange(6)]
  # Adding the new intensities
  intensities[0] += [intenList[0]]
  intensities[1] += [intenList[1]]
  intensities[2] += [intenList[2]]
  intensities[3] += [intenList[3]]
  intensities[4] += [intenList[4]]
  intensities[5] += [intenList[5]]

end = time.time()
print('The processing took %.3f seconds.' % (end-start))

Я пробовал множество разных вещей, чтобы улучшить время выполнения, чтобы сохранить интенсивности в матрице вместо 6 отдельных списков, чтобы минимизировать количество назначений - без удачи. Приведенное выше решение является самым быстрым, которое мне удалось написать на данный момент, и для обработки всего набора данных все еще требуется около 700 секунд.

Предложение по улучшению моего собственного

Я подумал, что с помощью представление разреженной матрицы, хранящее соответствующие индексы в строках и столбцах, представляющих каждый временной шаг, может улучшить время выполнения, но очень медленно назначать индексы для каждого временного шага для разреженной матрицы. Мое предложение перечислено ниже.

# Sparse-matrix-implementation
start = time.time()
# The size of the window to calculate the intensities over
T = 1
# The data
np.random.seed(2020)
npDeltaT = np.random.uniform(0,0.004,size=450000)
# Calculating the cumulative delta time
tIncreAccu = np.cumsum(npDeltaT)

firstOverT = np.where(tIncreAccu > T)[0][0]
first = (tIncreAccu < tIncreAccu[firstOverT - 1]) & (tIncreAccu >= (tIncreAccu[firstOverT - 1]-T))
sparseMatWhere = sparse.csc_matrix(first).T

for i in np.arange(firstOverT,len(tIncreAccu)):#len(tIncreAccu)
  sparseMatWhere = sparse.hstack([sparseMatWhere,
                                  sparse.csc_matrix(np.array((tIncreAccu < tIncreAccu[i]) & (tIncreAccu >= (tIncreAccu[i]-T)))).T])

print(sparseMatWhere.shape)

end = time.time()
print('The processing took %.3f seconds.' % (end-start))

Я с удовольствием уточню некоторые вещи, которые я попробовал завтра, при необходимости.

Заранее, спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...