Проблема
Я пытаюсь отправить объект CPython объемом 2 ГБ, доступный только для чтения (может быть обработан), чтобы распределить рабочих через apply()
.Это в конечном итоге потребляет много памяти для процессов / потоков (14+ ГБ).
Есть ли способ загрузить объект только один раз в память и попросить рабочих одновременно использовать этот объект?
Подробнее о проблеме
У меня есть 2 Dask series Source_listи Pattern_list, содержащий 7 миллионов и 3 миллиона строк соответственно.Я пытаюсь найти все совпадения подстрок в Source_list (7M) из Pattern_list (3M).
Чтобы ускорить поиск по подстроке, я использую пакет pyahocorasick для созданияструктура данных Cpython (объект класса) из Pattern_list (объект может работать с рассолами).
Вещи, которые я пробовал
- , работающий с одним планировщиком dask, занимает около 2,5часы для обработки, но заканчиваются с правильными результатами.
- работа с распределенным dask обычно приводит к:
distributed.worker - WARNING - Memory use is high but worker has no data to
store to disk. Perhaps some other process is leaking memory? Process memory:
2.85 GB -- Worker memory limit: 3.00 GB
работает с распределенным ресурсом, объем памяти которого увеличен до 8 ГБ / 16 ГБ:
Потоки
distributed.worker - WARNING - Memory use is high but worker has no
data to store to disk. Perhaps some other process is leaking
memory?
Process memory: 14.5 GB -- Worker memory limit: 16.00 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
Процессы На обработку уходит более 2,5 часов, и я никогда не видел, чтобы он завершился (оставил его работать на 8+ часов до отмены).Он также потребляет 10+ ГБ памяти
Использование векторизованной строковой операции
Source_list.str.find_all(Pattern_list)
занимает более 2,5 часов. Сохранение объекта в глобальной переменнойи его вызов приводит к той же ошибке, что и в пункте 3 для процессов и потоков. Использование map_partitions + loop / map в Source_list дает те же результаты, что и в пункте 3.
Dask Distributed Code
# OS = Windows 10
# RAM = 16 GB
# CPU cores = 8
# dask version 1.1.1
import dask.dataframe as dd
import ahocorasick
from dask.distributed import Client, progress
def create_ahocorasick_trie(pattern_list):
A = ahocorasick.Automaton()
for index, item in pattern_list.iteritems():
A.add_word(item,item)
A.make_automaton()
return A
if __name__ == '__main__':
client = Client(memory_limit="12GB",processes=False)
# Using Threading, because, the large_object seems to get copied in memory
# for each process when processes = True
Source_list = dd.read_parquet("source_list.parquet")
Pattern_list = dd.read_parquet("pattern_list.parquet")
# Note: 'source_list.parquet' and 'pattern_list.parquet' are generated via dask
large_object = create_ahocorasick_trie(Pattern_list)
result = Source_list.apply(lambda source_text: {large_object.iter(source_text)}, meta=(None,'O'))
# iter() is an ahocorasick Cpython method
progress(result.head(10))
client.close()