Многопроцессорный пул Python map_async зависает - PullRequest
0 голосов
/ 03 мая 2018

У меня есть список из 80 000 строк, которые я запускаю через анализатор дискурса, и чтобы увеличить скорость этого процесса, я пытался использовать многопроцессорный пакет python.

Код парсера требует Python 2.7, и в настоящее время я запускаю его на 2-ядерном компьютере с Ubuntu, используя подмножество строк. Для коротких списков, т. Е. 20, процесс запускается без проблем на обоих ядрах, однако, если я запускаю список из примерно 100 строк, оба рабочих будут зависать в разных точках (поэтому в некоторых случаях рабочий 1 не останавливается в течение нескольких минут после работника 2). Это происходит до того, как все строки завершены и что-либо возвращено. Каждый раз, когда ядра останавливаются в одной и той же точке, если используется одна и та же функция отображения, но эти точки отличаются, если я пытаюсь использовать другую функцию отображения, то есть map против map_async против imap.

Я попытался удалить строки с этими индексами, которые не оказали никакого влияния, и эти строки нормально работают в более коротком списке. На основании операторов печати, которые я включил, когда кажется, что процесс останавливается, текущая итерация заканчивается для текущей строки и просто не переходит к следующей строке. Чтобы добраться до места, где оба работника замерзли, требуется около часа, и я не смог воспроизвести проблему за меньшее время. Код, включающий многопроцессорные команды:

def main(initial_file, chunksize = 2):
    entered_file = pd.read_csv(initial_file)
    entered_file = entered_file.ix[:, 0].tolist()

    pool = multiprocessing.Pool()

    result = pool.map_async(discourse_process, entered_file, chunksize = chunksize)

    pool.close()
    pool.join()

    with open("final_results.csv", 'w') as file:
        writer = csv.writer(file)
        for listitem in result.get():
            writer.writerow([listitem[0], listitem[1]])

if __name__ == '__main__':
    main(sys.argv[1])

Когда я останавливаю процесс с помощью Ctrl-C (который не всегда работает), я получаю сообщение об ошибке:

^CTraceback (most recent call last):
  File "Combined_Script.py", line 94, in <module>
    main(sys.argv[1])
  File "Combined_Script.py", line 85, in main
    pool.join()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 474, in join
    p.join()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 145, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 154, in wait
    return self.poll(0)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 135, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Process PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
^CProcess PoolWorker-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 392, in put
    return send(obj)
KeyboardInterrupt
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt

Когда я смотрю на память в другом командном окне, используя htop, память останавливается на уровне <3% после остановки рабочих. Это моя первая попытка параллельной обработки, и я не уверен, что еще мне не хватает? </p>

1 Ответ

0 голосов
/ 17 мая 2018

Мне не удалось решить проблему с многопроцессорным пулом, однако я наткнулся на пакет loky и смог использовать его для запуска моего кода со следующими строками:

executor = loky.get_reusable_executor(timeout = 200, kill_workers = True)
results = executor.map(discourse_process, entered_file)
...