Невозможно присоединиться к многопроцессорным рабочим с очередью - PullRequest
0 голосов
/ 06 июня 2019

Я реализовал многопроцессорный итератор, который обрабатывает элементы с использованием нескольких рабочих.Для досрочного увольнения работников с помощью метода close(), указанного ниже, в рабочие ставится очередь, и эти работники отвечают отправкой сторожа обратно.

it = PoolIter()
for _ in range(8):
   next(it)
it.close() # Raises sometimes a RuntimeError('Cannot join worker')

Проблема, с которой я сталкиваюсь, возникает в основном несколько разнаугад: работник все еще жив, хотя все стражи были обработаны, и вызов join() рабочего прекратится.

Я предполагаю, что в очереди есть некоторые висячие элементы, но я не могу объяснить, почему они все ещетам.

def _run_worker(done_event, job_queue, result_queue):
    while True:
        index = job_queue.get()
        if done_event.is_set() and index is None:
            result_queue.put(None)
            break
        # Clear up job_queue until sentinel is get
        if done_event.is_set():
            continue
        # Do heavy work here
        result_queue.put(index)
    result_queue.close()

class PoolIter:

    def __init__(self):
        self._done_event = multiprocessing.Event()
        self._job_queue = multiprocessing.Queue()
        self._result_queue = multiprocessing.Queue()
        self._workers = []
        for worker_id in range(32):
            worker = multiprocessing.Process(target=_run_worker,
                    args=(self._done_event, self._job_queue, self._result_queue))
            worker.daemon = True
            worker.start()
            self._workers.append(worker)

        self._index_iter = iter(range(1024))
        self._receive_index_iter = iter(range(1024)))
        self._ordered_results = dict()

        for _ in range(2 * 32):
            self._queue_next()

    def __iter__(self):
        return self

    def __next__(self):
        next_receive_index = next(self._receive_index_iter)
        try:
            start_time = time.time()
            while next_receive_index not in self._ordered_results:
                if time.time() - start_time > self.timeout:
                    raise RuntimeError('Timeout after %0.1fs' % self.timeout)
                try:
                    index = self._result_queue.get(timeout=1)
                except queue.Empty:
                    continue
                else:
                    self._ordered_results[index] = index
            return self._ordered_results.pop(next_receive_index)
        finally:
            self._queue_next()

    def close(self):
        # Tell the workers to skip heavy processing
        self._done_event.set()

        # Send sentinels to each worker
        for _ in self._workers:
            self._job_queue.put(None)
        self._job_queue.close()

        # Clear up result queue and wait for all sentinels from workers
        num_running_workers = len(self._workers)
        start_time = time.time()
        while num_running_workers:
            if time.time() - start_time > self.timeout:
                raise RuntimeError('Worker is not getting sentinel end signal')
            try:
                item = self._result_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)
            except queue.Empty:
                continue
            if item is None:
                num_running_workers -= 1
                start_time = time.time()

        # Join workers and remove zombie processes
        for worker in self._workers:
            worker.join(self.timeout)
            if worker.is_alive():
                raise RuntimeError('Cannot join worker')
        self._workers.clear()

    def _queue_next(self):
        index = next(self._index_iter, None)
        if index is None:
            return
        self._job_queue.put(index)
...