Dask обработка кадров данных ненормально медленная с большими входными файлами - PullRequest
0 голосов
/ 26 февраля 2020

Я использую код python3, который принимает в качестве входных данных очень большой файл (5,9 ГБ, ~ 240 миллионов строк, 3 столбца).

Предыдущая версия кода построчно анализировала этот файл, и каждые 500 000 строк отправляла бы процесс на multiprocessing.Pool() для анализа этих строк.

Эта версия работает, но она медленная из-за ограничения ввода-вывода, определяемого построчным чтением файла. Поэтому я решил попробовать и реализовать более умный способ сделать это, используя комбинацию pandas, dask и numpy.

Текущая версия кода выполняет следующее:

  • Большой файл сначала полностью читается как dask кадр данных (с отложенной загрузкой)
  • Фрейм данных dask записывается в parquet, разделяя разделы по ключам, найденным в первом столбце файла (~ 20 ключей)
  • Код перебирает вышеупомянутые ключи, загружая только необходимые раздел для экономии памяти
  • Загруженный раздел преобразуется в pandas фрейм данных для анализа
  • pandas фрейм данных разделяется на windows с 500 000 бит / с (как и оригинал). код)
  • Каждое окно отправляется как процесс на multiprocessing.Pool()

Проблема, с которой я сталкиваюсь, заключается в том, что этой новой версии кода требуется ~ 35 часов для завершения предыдущая версия завершена за ~ 1,5 часа. Однако с уменьшенным набором данных (~ 5% от общего файла) новая версия на намного быстрее, чем старая.

В самом коде нет ошибок, но я полагаю, мне не хватает чего-то более теоретического о том, как использовать dask? Этот вопрос звонит кому-нибудь, кто мог бы указать мне в любом направлении?

Я не добавил весь код, потому что а) он очень длинный и б) я не знаю, какую строку на самом деле нужно указывать. Однако, вот что:

как я читаю необработанный файл на фрейме данных dask

dd.read_csv(raw_file_path, 
        sep="\t", 
        names=["Sequence", "Position", "Coverage"],
        dtype={ "Sequence":"category", 
        "Position":"uint32", 
        "Coverage":"uint32"}    )

как я пишу его в паркет

output_dir_prefix = ... # prefix to output directory (with path)
dd.to_parquet(  Cov_df, "{0}.parquet".format(output_dir_prefix), 
        engine="pyarrow", 
        partition_on=["Sequence"])

как я читаю раздел обратно

scaf = ... #(the key from the iterable)
Cov_df = dd.read_parquet("parquet/Sequence={0}".format(scaf),
        engine="pyarrow").iloc[:,0,1]]

как я конвертирую его в pandas датафрейм

start = ... # integer
end = ... # integer
Window_lines =  pd.DataFrame(Cov_df[
        (Cov_df['Position'] >= start) & 
        (Cov_df['Position'] <= end) ].compute())

Если у кого-нибудь есть какие-либо предположения о том, что я могу делать неправильно или что я недостаточно оптимизирую, я с радостью буду признателен! Кажется, проблема связана с размером входного файла (с небольшими входными файлами он быстрее, с полными входными файлами - медленнее).

Вот файл, который похож на мой необработанный файл (с точки зрения размера, строк и столбцов): https://bokubox.boku.ac.at/index.php/get/8d57cf66bc7e79a6ebe644dc43eb9035/Genome_C.nonrep.5x.depth

...