использование памяти при индексации большого кадра данных dask на одном многоядерном компьютере - PullRequest
0 голосов
/ 29 июня 2018

Я пытаюсь превратить дамп Wikipedia CirrusSearch в защищенный паркетом фрейм данных dask, проиндексированный по заголовку на 16-ядерном экземпляре GCP 450G. Дампы CirrusSearch представлены в виде единого файла в формате json. Английские дампы Wipedia содержат 5M записей и сжаты на 12G и расширены на 90 + G. Важной деталью является то, что записи не совсем плоские.

Самый простой способ сделать это будет

import json
import dask
from  dask import bag as db, dataframe as ddf
from  toolz import curried as tz
from toolz.curried import operator as op

blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')

lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'

source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'

(
 db
 .read_text(source, blocksize=blocksize)
 .map(json.loads)
 .filter(tz.flip(op.contains, 'title'))
 .to_dataframe()
 .set_index('title', npartitions=npartitions)
 .to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)

Первая проблема заключается в том, что в планировщике по умолчанию используется только одно ядро. Эту проблему можно избежать, явно используя распределенные или многопроцессорные планировщики.

Большая проблема со всеми планировщиками и настройками, которые я пробовал, - это использование памяти. Похоже, что dask пытается загрузить весь фрейм данных в память при индексации. Даже 450G недостаточно для этого.

  • Как уменьшить использование памяти для этой задачи?
  • Как оценить минимально необходимый объем памяти без проб и ошибок?
  • Есть ли лучший подход?

1 Ответ

0 голосов
/ 29 июня 2018

Почему Dask использует только одно ядро?

Часть JSON-разбора, вероятно, связана с GIL, вы хотите использовать процессы. Однако когда вы, наконец, что-то вычисляете, вы используете фреймы данных, которые обычно предполагают, что вычисления высвобождают GIL (это часто встречается в Pandas), поэтому он использует потоковый бэкэнд по умолчанию. Если вы в основном связаны с этапом синтаксического анализа GIL, то, возможно, вы захотите использовать многопроцессорный планировщик. Это должно решить вашу проблему:

dask.config.set(scheduler='multiprocessing')

Как избежать использования памяти во время фазы set_index

Да, для вычисления set_index требуется полный набор данных. Это сложная проблема. Если вы используете планировщик с одним компьютером (как вы, похоже, делаете), то для этого процесса сортировки необходимо использовать внешнюю структуру данных. Я удивлен, что у него заканчивается память.

Как оценить минимально необходимый объем памяти без проб и ошибок?

К сожалению, сложно оценить размер JSON-подобных данных в памяти на любом языке. Это гораздо проще с плоскими схемами.

Есть ли лучший подход?

Это не решит вашу основную проблему, но вы можете рассмотреть возможность размещения данных в формате Parquet перед тем, как попытается отсортировать все. Затем попробуйте сделать dd.read_parquet(...).set_index(...).to_parquet(...) в изоляции. Это может помочь выделить некоторые расходы.

...