Python concurrent.futures Ошибка BrokenProcessPool, возникающая из UMAP fit_transform в рекурсивной функции - PullRequest
0 голосов
/ 25 июня 2019

Я пытаюсь распараллелить рекурсивную программу, которая использует UMAP fit_transform на каждом рекурсивном шаге, используя concurrent.futures. Программа по существу применяет UMAP fit_transform к фрейму данных pandas, а затем разбивает этот фрейм данных и рекурсивно применяет fit_transform снова. Также следует отметить, что на каждом шаге он передает объект, содержащий новый фрейм данных pandas. Я сталкиваюсь с ошибкой "Broken Process Pool", которая, кажется, идет из исключения queue.full. Также представляется, что в каждом процессе программа сталкивается с ошибкой во время шага UMAP fit_transform. Я не параллельно в UMAP, я просто вызываю рекурсивную функцию в нескольких отдельных процессах, каждый из которых использует UMAP. Кто-нибудь сталкивался с чем-то подобным при распараллеливании рекурсивной функции? Существует ли вероятность чрезмерного использования памяти при рекурсивном создании объектов для распараллеливания?

Я пытался увеличить лимиты ОЗУ и различные многопроцессорные библиотеки в Python (concurrent.futures, пул многопроцессорной обработки и т. Д.). Я также обнаружил, что при нерекурсивном использовании я могу безошибочно применить UMAP fit_transform.

Вот как я настраиваю шаг распараллеливания:

class UmapObj:
    // Hold a pandas dataframe
    // Has a method (doUmap) that applies UMAP on the dataframe,
    // then splits it into sub matrices and applies UMAP again
    // recursively, at each stage joins matrices together


def doUmapWrap(UmapObj):
    UmapObj.doUmap()

ex = futures.ProcessPoolExecutor(max_workers=4)

arr = [UmapObj1, UmapObj2, UmapObj3] // Note: arr contains UmapObjects
outarr = []

for f in arr:
    i = ex.submit(doUmapWrap, f)
    outarr.append(i)

for i in outarr:
    print(i.result().df)

Это трассировка ошибки, которую я получаю:

"/ Users / anantmaheshwari / anaconda / lib / python3.6 / concurrent / futures / _base.py", строка 405, в результате Исключение в потоке Thread-1: Traceback (последний вызов был последним): Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/threading.py", строка 916, в _bootstrap_inner self.run () Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/threading.py", строка 864, в работе self._target (* self._args, ** self._kwargs) Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/process.py", строка 295, в _queue_management_worker shutdown_worker () Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/process.py", строка 253, в shutdown_worker call_queue.put_nowait (отсутствует) Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/multiprocessing/queues.py", строка 129, в put_nowait вернуть self.put (obj, False) Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/multiprocessing/queues.py", строка 83, на месте поднять Full queue.Full

вернуть себя .__ get_result () Файл "/Users/anantmaheshwari/anaconda/lib/python3.6/concurrent/futures/_base.py", строка 357, в __get_result поднять себя. Исключение concurrent.futures.process.BrokenProcessPool: Процесс в пуле процессов был внезапно прерван во время работы или ожидания будущего.

...