Я пытаюсь использовать многопроцессорную обработку на 12-ядерном компьютере для чтения файла Excel - файл размером 60 МБ с 15 листами и 10000 строк каждый. Импорт всех листов с помощью pandas.read_csv без распараллеливания занимает около 33 секунд.
Если я использую pool.map (), он работает, но занимает больше времени, чем непараллельная версия: 150 секунд против 33!
Если я использую pool.map_async (), это займет 36 секунд, но я не могу получить доступ (и поэтому не могу проверить) к выводу!
Мои вопросы:
- что я делаю не так? и pool.map, и pool.map_async занимают примерно
в то же время, даже если я установил nrows = 10 в read_single_sheet
функция; в то же время, читает ли он 10 строк или 10 000 - как это
возможно?
- Как я могу получить результаты pool.map_async ()? я пытался
output = [p.get() for p in dataframes]
но это не работает:
MapResult
объект не повторяется
- Является ли это скорее IO-привязанным, чем CPU
проблема? Тем не менее, почему pool.map занимает так много времени?
Чтение тех же данных из CSV (каждый лист Excel, сохраненный в отдельном CSV) занимает 2 секунды на моем аппарате. Тем не менее, CSV не очень хороший вариант для того, что мне нужно делать. Я часто имею от 10 до 20 вкладок среднего размера; Преобразование их вручную часто может занять больше времени, чем ожидание чтения пандами, плюс, если я получу обновленные версии, мне придется снова выполнить ручное преобразование.
Я знаю, что мог бы использовать скрипт VBA в Excel для автоматического сохранения каждого листа в CSV, но типы данных чаще всего выводятся правильно при чтении из Excel - не так с CSV, особенно для дат (мои даты никогда не бывают равны ISO гггг- mm-dd): мне нужно будет определить поля даты, указать формат и т. д. - просто чтение из Excel часто будет быстрее. Тем более, что эти задачи, как правило, одноразовые: я импортирую данные один раз, может быть, два или три раза, если получаю обновление, сохраняю их в SQL, а затем все мои скрипты Python читают из SQL.
Код, который я использую для чтения файла:
import numpy as np
import pandas as pd
import time
import multiprocessing
from multiprocessing import Pool
def parallel_read():
pool = Pool(num_cores)
# reads 1 row only, to retrieve column names and sheet names
mydic = pd.read_excel('excel_write_example.xlsx', nrows=1, sheet_name=None)
sheets =[]
for d in mydic:
sheets.extend([d])
dataframes = pool.map( read_single_sheet , sheets )
return dataframes
def parallel_read_async():
pool = Pool(num_cores)
# reads 1 row only, to retrieve column names and sheet names
mydic = pd.read_excel('excel_write_example.xlsx', nrows=1, sheet_name=None)
sheets =[]
for d in mydic:
sheets.extend([d])
dataframes = pool.map_async( read_single_sheet , sheets )
output = None
# this below doesn`t work - can`t understand why
output = [p.get() for p in dataframes]
return output
def read_single_sheet(sheet):
out = pd.read_excel('excel_write_example.xlsx', sheet_name=sheet )
return out
num_cores = multiprocessing.cpu_count()
if __name__=='__main__':
start=time.time()
out_p = parallel_read()
time_par = time.time() -start
out_as = parallel_read_async()
time_as = time.time() - start - time_par
Код, который я использовал для создания Excel:
import numpy as np
import pandas as pd
sheets = 15
rows= int(10e3)
writer = pd.ExcelWriter('excel_write_example.xlsx')
def create_data(sheets, rows):
df = {} # dictionary of dataframes
for i in range(sheets):
df[i] = pd.DataFrame(data= np.random.rand(rows,30) )
df[i]['a'] = 'some long random text'
df[i]['b'] = 'some more random text'
df[i]['c'] = 'yet more text'
return df
def data_to_excel(df, writer):
for d in df:
df[d].to_excel(writer, sheet_name = str(d), index=False)
writer.close()
df = create_data(sheets, rows)
data_to_excel(df, writer)