Я реализовал многопроцессорный итератор, который обрабатывает элементы с использованием нескольких рабочих.Для досрочного увольнения работников с помощью метода close()
, указанного ниже, в рабочие ставится очередь, и эти работники отвечают отправкой сторожа обратно.
it = PoolIter()
for _ in range(8):
next(it)
it.close() # Raises sometimes a RuntimeError('Cannot join worker')
Проблема, с которой я сталкиваюсь, возникает в основном несколько разнаугад: работник все еще жив, хотя все стражи были обработаны, и вызов join()
рабочего прекратится.
Я предполагаю, что в очереди есть некоторые висячие элементы, но я не могу объяснить, почему они все ещетам.
def _run_worker(done_event, job_queue, result_queue):
while True:
index = job_queue.get()
if done_event.is_set() and index is None:
result_queue.put(None)
break
# Clear up job_queue until sentinel is get
if done_event.is_set():
continue
# Do heavy work here
result_queue.put(index)
result_queue.close()
class PoolIter:
def __init__(self):
self._done_event = multiprocessing.Event()
self._job_queue = multiprocessing.Queue()
self._result_queue = multiprocessing.Queue()
self._workers = []
for worker_id in range(32):
worker = multiprocessing.Process(target=_run_worker,
args=(self._done_event, self._job_queue, self._result_queue))
worker.daemon = True
worker.start()
self._workers.append(worker)
self._index_iter = iter(range(1024))
self._receive_index_iter = iter(range(1024)))
self._ordered_results = dict()
for _ in range(2 * 32):
self._queue_next()
def __iter__(self):
return self
def __next__(self):
next_receive_index = next(self._receive_index_iter)
try:
start_time = time.time()
while next_receive_index not in self._ordered_results:
if time.time() - start_time > self.timeout:
raise RuntimeError('Timeout after %0.1fs' % self.timeout)
try:
index = self._result_queue.get(timeout=1)
except queue.Empty:
continue
else:
self._ordered_results[index] = index
return self._ordered_results.pop(next_receive_index)
finally:
self._queue_next()
def close(self):
# Tell the workers to skip heavy processing
self._done_event.set()
# Send sentinels to each worker
for _ in self._workers:
self._job_queue.put(None)
self._job_queue.close()
# Clear up result queue and wait for all sentinels from workers
num_running_workers = len(self._workers)
start_time = time.time()
while num_running_workers:
if time.time() - start_time > self.timeout:
raise RuntimeError('Worker is not getting sentinel end signal')
try:
item = self._result_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)
except queue.Empty:
continue
if item is None:
num_running_workers -= 1
start_time = time.time()
# Join workers and remove zombie processes
for worker in self._workers:
worker.join(self.timeout)
if worker.is_alive():
raise RuntimeError('Cannot join worker')
self._workers.clear()
def _queue_next(self):
index = next(self._index_iter, None)
if index is None:
return
self._job_queue.put(index)