Dask высокое потребление памяти при загрузке нескольких кадров данных Pandas в словаре - PullRequest
0 голосов
/ 12 ноября 2018

У меня есть папка (7,7 ГБ) с несколькими кадрами данных панд, которые хранятся в формате файла паркета.Мне нужно загрузить все эти кадры данных в словаре Python, но, поскольку у меня есть только 32 ГБ ОЗУ, я использую метод .loc, чтобы просто загрузить нужные мне данные.

Когда все кадры данных загружаются в памятьв python я создаю общий индекс из индексов всех данных, а затем переиндексирую все кадры данных с новым индексом.

Я разработал два сценария для этого, первый в классическомпоследовательный способ, второй использует Dask в oder, чтобы получить некоторое улучшение производительности от всех ядер моего Threadripper 1920x.

последовательный код:

# Standard library imports
import os
import pathlib
import time

# Third party imports
import pandas as pd

# Local application imports


class DataProvider:

def __init__(self):

    self.data = dict()

def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:

    t = time.perf_counter()

    symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))

    # updating containers
    for symbol in symbol_list:

        path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
        name = symbol.replace('.parquet', '')

        self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]

    print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # building index
    index = None

    for symbol in self.data:

        if index is not None:
            index.union(self.data[symbol].index)
        else:
            index = self.data[symbol].index

    print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # reindexing data
    for symbol in self.data:

        self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()

    print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')


if __name__ == '__main__' or __name__ == 'builtins':

    source = r'WindowsPath'

    x = DataProvider()
    x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')

Код Dask:

# Standard library imports
import os
import pathlib
import time

# Third party imports
from dask.distributed import Client
import pandas as pd

# Local application imports


def __load_parquet__(directory, timeframe_start, timeframe_end):
    return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]


def __reindex__(new_index, df):
    return df.reindex(index=new_index, method='pad').itertuples()


if __name__ == '__main__' or __name__ == 'builtins':

    client = Client()

    source = r'WindowsPath'
    start = '2015'
    end = '2015'

    t = time.perf_counter()

    file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]

    # build data
    data = dict()
    for file in file_list:

        path = pathlib.Path.joinpath(pathlib.Path(source), file)
        symbol = file.replace('.parquet', '')

        data[symbol] = client.submit(__load_parquet__, path, start, end)

    print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # build index
    index = None
    for symbol in data:
        if index is not None:
            index.union(data[symbol].result().index)
        else:
            index = data[symbol].result().index

    print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')

    t = time.perf_counter()

    # reindex
    for symbol in data:
        data[symbol] = client.submit(__reindex__, index, data[symbol].result())

    print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')

Я нашел результаты довольно странными.

Последовательный код:

  • макс памятипотребление во время вычислений: 30,2 ГБ
  • потребление памяти в конце вычислений: 15,6 ГБ
  • общее потребление памяти (без Windows и других): 11,6 ГБ
  • Loaded daта в 54,289 секунд.
  • Встроенный индекс в 0,428 секунд.
  • Переиндексированные данные в 9,666 секунд.

Код Dask:

  • максимальное потребление памяти во время вычислений: 25.2 ГБ
  • потребление памяти в концевычисления: 22,6 ГБ
  • общее потребление памяти (без Windows и других): 18,9 ГБ
  • Загруженные данные в 0,638 секунд.
  • Встроенный индекс в 27,541 секунд.
  • Переиндексированные данные в 30,179 секунд.

Мои вопросы:

  1. Почему в Dask потребление памяти в конце вычислений намного выше?
  2. Почему в Dask создание общего индекса и переиндексация всех кадров данных занимает так много времени?

Кроме того, при использовании кода Dask консоль выводит мне следующую ошибку.

C:\Users\edit\Anaconda3\envs\edit\lib\site-packages\distribute\worker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph: 
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce  scheduler burden and keep data on workers
future = client.submit(func, big_data)    # bad
big_future = client.scatter(big_data)     # good
future = client.submit(func, big_future)  # good
% (format_bytes(len(b)), s))

Даже если предложения об ошибках действительно хороши, яне понимаю, что не так с моим кодом.Почему говорится хранить данные о работниках ?Я думал, что с помощью метода отправки я отправляю все данные своему клиенту, и поэтому работники имеют легкий доступ ко всем данным.Спасибо всем за помощь.

...