Как разделить большой объект только для чтения среди распределенных работников Dask - PullRequest
0 голосов
/ 09 февраля 2019

Проблема

Я пытаюсь отправить объект CPython объемом 2 ГБ, доступный только для чтения (может быть обработан), чтобы распределить рабочих через apply().Это в конечном итоге потребляет много памяти для процессов / потоков (14+ ГБ).

Есть ли способ загрузить объект только один раз в память и попросить рабочих одновременно использовать этот объект?

Подробнее о проблеме

У меня есть 2 Dask series Source_listи Pattern_list, содержащий 7 миллионов и 3 миллиона строк соответственно.Я пытаюсь найти все совпадения подстрок в Source_list (7M) из Pattern_list (3M).

Чтобы ускорить поиск по подстроке, я использую пакет pyahocorasick для созданияструктура данных Cpython (объект класса) из Pattern_list (объект может работать с рассолами).

Вещи, которые я пробовал

  1. , работающий с одним планировщиком dask, занимает около 2,5часы для обработки, но заканчиваются с правильными результатами.
  2. работа с распределенным dask обычно приводит к:
distributed.worker - WARNING - Memory use is high but worker has no data to 
store to disk. Perhaps some other process is leaking memory? Process memory:  
2.85 GB -- Worker memory limit: 3.00 GB

работает с распределенным ресурсом, объем памяти которого увеличен до 8 ГБ / 16 ГБ:

  • Потоки

    distributed.worker - WARNING - Memory use is high but worker has no 
    data to  store to disk. Perhaps some other process is leaking 
    memory? 
    Process memory:  14.5 GB -- Worker memory limit: 16.00 GB 
    distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
    
  • Процессы На обработку уходит более 2,5 часов, и я никогда не видел, чтобы он завершился (оставил его работать на 8+ часов до отмены).Он также потребляет 10+ ГБ памяти

Использование векторизованной строковой операции Source_list.str.find_all(Pattern_list) занимает более 2,5 часов. Сохранение объекта в глобальной переменнойи его вызов приводит к той же ошибке, что и в пункте 3 для процессов и потоков. Использование map_partitions + loop / map в Source_list дает те же результаты, что и в пункте 3.

Dask Distributed Code

# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1

import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress

def create_ahocorasick_trie(pattern_list):
    A = ahocorasick.Automaton()
    for index, item in pattern_list.iteritems():
         A.add_word(item,item)
    A.make_automaton()
    return A 

if __name__ == '__main__':
    client = Client(memory_limit="12GB",processes=False)

    # Using Threading, because, the large_object seems to get copied in memory 
    # for each process when processes = True

    Source_list = dd.read_parquet("source_list.parquet") 
    Pattern_list = dd.read_parquet("pattern_list.parquet")

    # Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask

    large_object = create_ahocorasick_trie(Pattern_list)

    result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))

    # iter() is an ahocorasick Cpython method

    progress(result.head(10))

    client.close()




1 Ответ

0 голосов
/ 20 февраля 2019

Краткий ответ - обернуть его в вызов с задержкой dask.

big = dask.delayed(big)
df.apply(func, extra=big)

Dask будет перемещать его по мере необходимости и обрабатывать как собственный фрагмент данных.При этом он должен существовать на каждом работнике, поэтому у вас должно быть значительно больше оперативной памяти на одного работника, чем эта штука занимает.(не менее 4х или более).

...