Сбой на Bag.to_avro из-за Killed / MemoryError на большом наборе данных - PullRequest
0 голосов
/ 09 декабря 2018

Я пытаюсь обработать большой набор текстовых файлов, разделенных новыми строками.Файлы распакованы, и я разбил файлы на небольшие куски, где без сжатия они составляют ~ 100 МБ или около того.У меня есть 296 отдельных сжатых файлов с общим несжатым размером ~ 30 ГБ.

Строки NQuads , и я использую Bag для отображения строк в формате, который я могу импортировать в базу данных.Строки складываются по ключу, так что я могу комбинировать строки, относящиеся к одной странице.

Это код, который я использую для чтения файлов и сворачивания их.

with dask.config.set(num_workers=2):
  n_quads_bag = dask.bag.\
    read_text(files)

  uri_nquads_bag = n_quads_bag.\
    map(parser.parse).\
    filter(lambda x: x is not None).\
    map(nquad_tuple_to_page_dict).\
    foldby('uri', binop=binop).\
    pluck(1).\
    map(lang_extract)

Затем я нормализую данные по страницам и объектам.Я делаю это с помощью функции map, которая разбивает вещи на кортеж с (page, entities).Я собираю данные и затем записываю их в два отдельных набора файлов в Avro.

  pages_entities_bag = uri_nquads_bag.\
      map(map_page_entities)

  pages_bag = pages_entities_bag.\
    pluck(0).\
    map(page_extractor).\
    map(extract_uri_details).\
    map(ntriples_to_dict)

  entities_bag = pages_entities_bag.\
    pluck(1) .\
    flatten().\
    map(entity_extractor).\
    map(ntriples_to_dict)

  with ProgressBar():
    pages_bag.to_avro(
      os.path.join(output_folder, 'pages.*.avro'),
      schema=page_avro_scheme,
      codec='snappy',
      compute=True)
    entities_bag.to_avro(
      os.path.join(output_folder, 'entities.*.avro'),
      schema=entities_avro_schema,
      codec='snappy',
      compute=True)

Код не выполняется на pages_bag.to_avro(... compute=True) с Killed/MemoryError.Я поиграл с уменьшением размеров разделов и уменьшил количество процессоров до 2.

Я не прав в настройке compute=True?Это причина того, что весь набор данных заносится в память?Если так, как еще я могу получить файлы для записи?

Или возможно, что разделы страниц или объектов слишком велики для компьютера?

Другой вопрос, который у меня возник, заключается в том, что я неправильно использую Bags, и это правильный подход к проблеме, которую я хочу решить?

Характеристики машины, на которой я работаю:

  • 4 ЦП
  • 16 ГБ ОЗУ
  • 375 Скретч-диск

1 Ответ

0 голосов
/ 10 декабря 2018

Чтобы не допустить исчерпания памяти, нужно сохранить файлы ~ 100 МБ несжатыми и использовать groupby.Как указано в документации Dask, вы можете принудительно перемешать ее на диске.groupby поддерживает настройку количества разделов на выходе.

with dask.config.set(num_workers=2):
  n_quads_bag = dask.bag.\
    read_text(files)

  uri_nquads_bag = n_quads_bag.\
    map(parser.parse).\
    filter(lambda x: x is not None).\
    map(nquad_tuple_to_page_dict).\
    groupby(lambda x: x[3], shuffle='disk', npartitions=n_quads_bag.npartitions).\        
    map(grouped_nquads_to_dict).\
    map(lang_extract)
...