Python многопроцессорная работа pool.map зависает после вызова функции sklearn - PullRequest
0 голосов
/ 27 марта 2020

Я пытаюсь выполнить некоторые вычисления между 2D-массивом и набором 2D-массивов, используя multiprocessing. Предположим, у меня есть матрица mat1 и набор матриц test, в котором я хотел бы вычислить все умножения матриц между mat1 и элементами test. Я использую многопроцессорную обработку для параллельного выполнения вычислений, поскольку размер test очень большой. Однако я заметил, что даже для небольшого test вычисления никогда не завершаются. В частности, программа, кажется, никогда не заканчивает sh вычисление умножения матриц. Похоже, что вызов определенной функции sklearn вызывает проблему. Я написал следующий код, чтобы проиллюстрировать это (я использую partial вместо starmap, потому что я хотел бы использовать imap и tqdm в более поздний момент времени):

from multiprocessing import Pool
from functools import partial
import numpy as np
import sklearn as sklearn

def bar(y, x):

    # this does not seem to complete
    mul = x @ y.T

    # so this does not print
    print('done')

    return mul

def foo():

    mat1 = np.ones((1000000, 14))
    test = (np.ones((1,14)), np.ones((1,14)))

    # these will finish
    print(mat1 @ test[0].T)
    print(mat1 @ test[1].T)

    with Pool(6) as pool:
        result = pool.map(partial(bar, x=mat1), test
        p.close()
        p.join()

if __name__ == "__main__":

    # Causes the hang
    sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)), 
                                        np.ones((9000, 14)))

    foo()

ПРИМЕЧАНИЕ: Для тех, кто не знаком с partial, это из документации:

functools.partial (func [, * args] [, ** ключевые слова])

Возвращает новый частичный объект, который при вызове будет вести себя как забавный c вызывается с аргументами позиционных аргументов и ключевыми словами аргументов ключевых слов.

Я вынужден остановить выполнение вручную, иначе он будет работать вечно. Я правильно не использую multiprocessing?

Для тех, кто интересуется, полный обратный вызов после принудительной остановки можно найти ниже:

--------------------------------------------------------------------------- KeyboardInterrupt                         Traceback (most recent call last) <ipython-input-18-6c073b574e37> in <module>
      8     
      9     sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)), np.ones((9000, 14)))
---> 10     foo()
     11 

<ipython-input-17-d183fc19ae3c> in foo()
     11     with Pool(6) as pool:
     12     # this will not finish
---> 13         result = pool.map(partial(bar, x=mat1), test)
     14         p.close()
     15         p.join()

~/anaconda3/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

~/anaconda3/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    649 
    650     def get(self, timeout=None):
--> 651         self.wait(timeout)
    652         if not self.ready():
    653             raise TimeoutError

~/anaconda3/lib/python3.7/multiprocessing/pool.py in wait(self, timeout)
    646 
    647     def wait(self, timeout=None):
--> 648         self._event.wait(timeout)
    649 
    650     def get(self, timeout=None):

~/anaconda3/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

~/anaconda3/lib/python3.7/threading.py in wait(self, timeout)
    294         try:    # restore state no matter what (e.g., KeyboardInterrupt)
    295             if timeout is None:
--> 296                 waiter.acquire()
    297                 gotit = True
    298             else:

KeyboardInterrupt:

ОБНОВЛЕНИЕ 1:

После дополнительной отладки я обнаружил нечто странное. После реализации кода sokato мне удалось исправить этот пример. Однако я могу снова вызвать проблему при вызове следующей функции sklearn прямо перед foo() в main():

sklearn.metrics.pairwise.rbf_kernel(np.ones((9000, 14)), np.ones((9000, 14)))

Я обновил исходное сообщение, чтобы отразить это.

1 Ответ

0 голосов
/ 27 марта 2020

Вам необходимо закрыть многопроцессорный пул. например,

def bar(y, x):

    # this does not seem to complete
    mul = x @ y.T

    # so this does not print
    print('done')

    return mul

def foo():

    mat1 = np.ones((1000000, 14))
    test = (np.ones((1,14)), np.ones((1,14)))

    with Pool(5) as p:
    # this will not finish
        result = p.map(partial(bar, x=mat1), test)
        p.close()

if __name__ == "__main__":

    foo()

Чтобы соответствовать вашему точному синтаксису, вы можете сделать это следующим образом:

    pool = Pool(6)
    result = pool.map(partial(bar, x=mat1), test)
    pool.close()

Если вам интересно узнать больше, я рекомендую вам ознакомиться с документацией. https://docs.python.org/3.4/library/multiprocessing.html?highlight=process#multiprocessing .pool.Pool

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...