У меня есть папка (7,7 ГБ) с несколькими кадрами данных панд, которые хранятся в формате файла паркета.Мне нужно загрузить все эти кадры данных в словаре Python, но, поскольку у меня есть только 32 ГБ ОЗУ, я использую метод .loc, чтобы просто загрузить нужные мне данные.
Когда все кадры данных загружаются в памятьв python я создаю общий индекс из индексов всех данных, а затем переиндексирую все кадры данных с новым индексом.
Я разработал два сценария для этого, первый в классическомпоследовательный способ, второй использует Dask в oder, чтобы получить некоторое улучшение производительности от всех ядер моего Threadripper 1920x.
последовательный код:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Код Dask:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
Я нашел результаты довольно странными.
Последовательный код:
- макс памятипотребление во время вычислений: 30,2 ГБ
- потребление памяти в конце вычислений: 15,6 ГБ
- общее потребление памяти (без Windows и других): 11,6 ГБ
- Loaded daта в 54,289 секунд.
- Встроенный индекс в 0,428 секунд.
- Переиндексированные данные в 9,666 секунд.
Код Dask:
- максимальное потребление памяти во время вычислений: 25.2 ГБ
- потребление памяти в концевычисления: 22,6 ГБ
- общее потребление памяти (без Windows и других): 18,9 ГБ
- Загруженные данные в 0,638 секунд.
- Встроенный индекс в 27,541 секунд.
- Переиндексированные данные в 30,179 секунд.
Мои вопросы:
- Почему в Dask потребление памяти в конце вычислений намного выше?
- Почему в Dask создание общего индекса и переиндексация всех кадров данных занимает так много времени?
Кроме того, при использовании кода Dask консоль выводит мне следующую ошибку.
C:\Users\edit\Anaconda3\envs\edit\lib\site-packages\distribute\worker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Даже если предложения об ошибках действительно хороши, яне понимаю, что не так с моим кодом.Почему говорится хранить данные о работниках ?Я думал, что с помощью метода отправки я отправляю все данные своему клиенту, и поэтому работники имеют легкий доступ ко всем данным.Спасибо всем за помощь.