Многопроцессорные блокировки во время больших вычислений с помощью Pool (). Apply_async - PullRequest
1 голос
/ 22 октября 2019

У меня есть проблема в Python 3.7.3, когда моя многопроцессорная операция (с использованием Queue, Pool и apply_async) блокируется при обработке больших вычислительных задач.

Для небольших вычислений эта многопроцессорная задача отлично работает. Однако при работе с более крупными процессами многопроцессорная задача останавливается или блокируется вообще без выхода из процесса! Я читал, что это произойдет, если вы «увеличиваете свою очередь без границ, и вы присоединяетесь к подпроцессу, который ожидает места в очереди [...], ваш основной процесс останавливается, ожидая его завершения, и онникогда не буду."( Process.join () и очередь не работают с большими числами )

У меня проблемы с преобразованием этой концепции в код. Я был бы очень признателен за рекомендации по рефакторингу кода, который я написал ниже:

import multiprocessing as mp

def listener(q, d):  # task to queue information into a manager dictionary
    while True:
        item_to_write = q.get()
        if item_to_write == 'kill':
            break
        foo = d['region']
        foo.add(item_to_write) 
        d['region'] = foo  # add items and set to manager dictionary


def main():
    manager = mp.Manager()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()

    pool = mp.Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d))
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))  # task for multiprocessing
        jobs.append(job)
    for job in jobs:
        job.get()  # begin multiprocessing task
    q.put('kill')  # kill multiprocessing task (view listener function)
    pool.close()
    pool.join()

    print('process complete')


if __name__ == '__main__':
    main()

В конечном счете, я бы хотел полностью исключить взаимоблокировку, чтобы упростить многопроцессорную задачу, которая может выполняться бесконечно до завершения.


Внизу находится трассировка при выходе из DEADLOCK в BASH

^CTraceback (most recent call last):
  File "multithread_search_cl_gamma.py", line 260, in <module>
    main(GEOTAG)
  File "multithread_search_cl_gamma.py", line 248, in main
    job.get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 651, in get
Process ForkPoolWorker-28:
Process ForkPoolWorker-31:
Process ForkPoolWorker-30:
Process ForkPoolWorker-27:
Process ForkPoolWorker-29:
Process ForkPoolWorker-26:
    self.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 648, in wait
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
     self._event.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 552, in wait
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
    signaled = self._cond.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
   with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

Ниже обновленный скрипт:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
            while True:
                item_to_write = q.get(False)
                if item_to_write == 'kill':
                    break
                foo = d['region']
                foo.add(item_to_write)
                d['region'] = foo
        except queue.Empty:
            pass

        time.sleep(0.5)
        if not q.empty():
            continue


def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    for job in jobs:
        job.get()
    q.put('kill')
    pool.close()
    pool.join()
    print('process complete')


if __name__ == '__main__':
    main()

UPDATE ::

execute_command выполняет несколькопроцессы, необходимые для поиска, поэтому я вставляю код для q.put().

В одиночку выполнение сценария займет> 72 часа. Каждый мультипроцесс никогда не завершает всю задачу, скорее, они работают индивидуально и ссылаются на manager.dict(), чтобы избежать повторения задач. Эти задачи работают до тех пор, пока не будет обработан каждый кортеж в manager.dict().

def area(self, tup, housing_dict, q):
    state, reg, sub_reg = tup[0], tup[1], tup[2]
    for cat in housing_dict:
        """
        computationally expensive, takes > 72 hours
        for a list of 512 tup(s)
        """
        result = self.search_geotag(
            state, reg, cat, area=sub_reg
            )
    q.put(tup)

В конечном итоге q.put(tup) помещается в функцию listener для добавления tup к manager.dict()

1 Ответ

1 голос
/ 22 октября 2019

Поскольку listener и execute_search совместно используют один и тот же объект очереди, возможна гонка, в которой execute_search получает 'kill' из очереди раньше, чем listener, поэтому listener застревает в блокировке get()навсегда, поскольку больше нет новых элементов.

Для этого случая вы можете использовать объект Event, чтобы подать сигнал всем процессам на остановку:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
           item_to_write = q.get(timeout=0.1)
           foo = d['region']
           foo.add(item_to_write)
           d['region'] = foo
        except queue.Empty:
            pass
    print("Listener process stopped")

def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    try:
        for job in jobs: 
            job.get(300) #get the result or throws a timeout exception after 300 seconds
    except multiprocessing.TimeoutError:
         pool.terminate()
    stop_event.set() # stop listener process
    print('process complete')


if __name__ == '__main__':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...