Почему random.sample иногда используется с мультипроцессором. Иногда блокировка пула? - PullRequest
3 голосов
/ 26 февраля 2020

Когда я запускаю следующий фрагмент, иногда он блокируется и не завершается sh, но иногда это происходит. Почему это так? Я использую Python 3.8 в Ubuntu 16.04 (4.4.0-173-generi c).

from functools import partial
from multiprocessing.pool import Pool
from random import sample

pool = Pool(4)
result = pool.map(partial(sample, range(10)), range(10))

То же самое происходит, когда я создаю экземпляр fre sh random.Random для каждого вызов функции:

def sample(data, k):
    rand = random.Random()
    return rand.sample(data, k)

В случае зависания и отправки SIGINT я получаю следующую трассировку, однако не могу понять:

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "[...]/multiprocessing/util.py", line 277, in _run_finalizers
Process ForkPoolWorker-2:
Process ForkPoolWorker-1:
Process ForkPoolWorker-3:
    finalizer()
  File "[...]/multiprocessing/util.py", line 201, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "[...]/multiprocessing/pool.py", line 689, in _terminate_pool
    cls._help_stuff_finish(inqueue, task_handler, len(pool))
  File "[...]/multiprocessing/pool.py", line 674, in _help_stuff_finish
    inqueue._rlock.acquire()
KeyboardInterrupt
Traceback (most recent call last):
Traceback (most recent call last):
  File "[...]/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "[...]/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "[...]/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "[...]/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "[...]/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "[...]/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "[...]/multiprocessing/queues.py", line 355, in get
    with self._rlock:
  File "[...]/multiprocessing/queues.py", line 355, in get
    with self._rlock:
  File "[...]/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "[...]/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
  File "[...]/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "[...]/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "[...]/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "[...]/multiprocessing/queues.py", line 356, in get
    res = self._reader.recv_bytes()
  File "[...]/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "[...]/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "[...]/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "[...]/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "[...]/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "[...]/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "[...]/multiprocessing/queues.py", line 355, in get
    with self._rlock:
  File "[...]/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

1 Ответ

1 голос
/ 27 февраля 2020

Это открытая ошибка в Python 3.8. Это не относится к random, причина в том, что очистка рабочих процессов выполняется неправильно. Например, также следующие взаимоблокировки:

from multiprocessing.pool import Pool

def test(x):
    return 'test'

pool = Pool(4)
result = pool.map(test, range(10))

Решением является либо вызов pool.close() вручную после возврата map, либо использование объекта пула в качестве диспетчера контекста:

with Pool(4) as pool:
    result = pool.map(test, range(10))
...