Я использую код python3, который принимает в качестве входных данных очень большой файл (5,9 ГБ, ~ 240 миллионов строк, 3 столбца).
Предыдущая версия кода построчно анализировала этот файл, и каждые 500 000 строк отправляла бы процесс на multiprocessing.Pool()
для анализа этих строк.
Эта версия работает, но она медленная из-за ограничения ввода-вывода, определяемого построчным чтением файла. Поэтому я решил попробовать и реализовать более умный способ сделать это, используя комбинацию pandas, dask и numpy.
Текущая версия кода выполняет следующее:
- Большой файл сначала полностью читается как dask кадр данных (с отложенной загрузкой)
- Фрейм данных dask записывается в
parquet
, разделяя разделы по ключам, найденным в первом столбце файла (~ 20 ключей) - Код перебирает вышеупомянутые ключи, загружая только необходимые раздел для экономии памяти
- Загруженный раздел преобразуется в pandas фрейм данных для анализа
- pandas фрейм данных разделяется на windows с 500 000 бит / с (как и оригинал). код)
- Каждое окно отправляется как процесс на
multiprocessing.Pool()
Проблема, с которой я сталкиваюсь, заключается в том, что этой новой версии кода требуется ~ 35 часов для завершения предыдущая версия завершена за ~ 1,5 часа. Однако с уменьшенным набором данных (~ 5% от общего файла) новая версия на намного быстрее, чем старая.
В самом коде нет ошибок, но я полагаю, мне не хватает чего-то более теоретического о том, как использовать dask? Этот вопрос звонит кому-нибудь, кто мог бы указать мне в любом направлении?
Я не добавил весь код, потому что а) он очень длинный и б) я не знаю, какую строку на самом деле нужно указывать. Однако, вот что:
как я читаю необработанный файл на фрейме данных dask
dd.read_csv(raw_file_path,
sep="\t",
names=["Sequence", "Position", "Coverage"],
dtype={ "Sequence":"category",
"Position":"uint32",
"Coverage":"uint32"} )
как я пишу его в паркет
output_dir_prefix = ... # prefix to output directory (with path)
dd.to_parquet( Cov_df, "{0}.parquet".format(output_dir_prefix),
engine="pyarrow",
partition_on=["Sequence"])
как я читаю раздел обратно
scaf = ... #(the key from the iterable)
Cov_df = dd.read_parquet("parquet/Sequence={0}".format(scaf),
engine="pyarrow").iloc[:,0,1]]
как я конвертирую его в pandas датафрейм
start = ... # integer
end = ... # integer
Window_lines = pd.DataFrame(Cov_df[
(Cov_df['Position'] >= start) &
(Cov_df['Position'] <= end) ].compute())
Если у кого-нибудь есть какие-либо предположения о том, что я могу делать неправильно или что я недостаточно оптимизирую, я с радостью буду признателен! Кажется, проблема связана с размером входного файла (с небольшими входными файлами он быстрее, с полными входными файлами - медленнее).
Вот файл, который похож на мой необработанный файл (с точки зрения размера, строк и столбцов): https://bokubox.boku.ac.at/index.php/get/8d57cf66bc7e79a6ebe644dc43eb9035/Genome_C.nonrep.5x.depth