Я пытаюсь распараллелить рекурсивную программу, которая использует UMAP fit_transform на каждом рекурсивном шаге, используя concurrent.futures. Программа по существу применяет UMAP fit_transform к фрейму данных pandas, а затем разбивает этот фрейм данных и рекурсивно применяет fit_transform снова. Также следует отметить, что на каждом шаге он передает объект, содержащий новый фрейм данных pandas.
Я сталкиваюсь с ошибкой "Broken Process Pool", которая, кажется, идет из исключения queue.full. Также представляется, что в каждом процессе программа сталкивается с ошибкой во время шага UMAP fit_transform.
Я не параллельно в UMAP, я просто вызываю рекурсивную функцию в нескольких отдельных процессах, каждый из которых использует UMAP. Кто-нибудь сталкивался с чем-то подобным при распараллеливании рекурсивной функции? Существует ли вероятность чрезмерного использования памяти при рекурсивном создании объектов для распараллеливания?
Я пытался увеличить лимиты ОЗУ и различные многопроцессорные библиотеки в Python (concurrent.futures, пул многопроцессорной обработки и т. Д.). Я также обнаружил, что при нерекурсивном использовании я могу безошибочно применить UMAP fit_transform.
Вот как я настраиваю шаг распараллеливания:
class UmapObj:
// Hold a pandas dataframe
// Has a method (doUmap) that applies UMAP on the dataframe,
// then splits it into sub matrices and applies UMAP again
// recursively, at each stage joins matrices together
def doUmapWrap(UmapObj):
UmapObj.doUmap()
ex = futures.ProcessPoolExecutor(max_workers=4)
arr = [UmapObj1, UmapObj2, UmapObj3] // Note: arr contains UmapObjects
outarr = []
for f in arr:
i = ex.submit(doUmapWrap, f)
outarr.append(i)
for i in outarr:
print(i.result().df)
Это трассировка ошибки, которую я получаю:
"/ Users / anantmaheshwari / anaconda / lib / python3.6 / concurrent / futures / _base.py", строка 405, в результате
Исключение в потоке Thread-1:
Traceback (последний вызов был последним):
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/threading.py", строка 916, в _bootstrap_inner
self.run ()
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/threading.py", строка 864, в работе
self._target (* self._args, ** self._kwargs)
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/process.py", строка 295, в _queue_management_worker
shutdown_worker ()
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/process.py", строка 253, в shutdown_worker
call_queue.put_nowait (отсутствует)
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/multiprocessing/queues.py", строка 129, в put_nowait
вернуть self.put (obj, False)
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/multiprocessing/queues.py", строка 83, на месте
поднять Full
queue.Full
вернуть себя .__ get_result ()
Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/_base.py", строка 357, в __get_result
поднять себя. Исключение
concurrent.futures.process.BrokenProcessPool: Процесс в пуле процессов был внезапно прерван во время работы или ожидания будущего.