ProcessPoolExecutor, BrokenProcessPool обработка - PullRequest
0 голосов
/ 03 октября 2018

В этой документации (https://pymotw.com/3/concurrent.futures/) говорится:

"ProcessPoolExecutor работает так же, как ThreadPoolExecutor, но использует процессы вместо потоков. Это позволяет выполнять операции, интенсивно использующие процессориспользовать отдельный процессор и не быть заблокированным глобальной блокировкой интерпретатора CPython. "

Звучит великолепно!В нем также говорится:

«Если что-то случится с одним из рабочих процессов и неожиданно завершится, ProcessPoolExecutor будет считаться« сломанным »и больше не будет планировать задачи.»

Это звучит плохо :( Так что, я думаю, мой вопрос: что считается «Неожиданно?». Означает ли это, что сигнал выхода не равен 1? Могу ли я безопасно выйти из потока и все еще продолжать обрабатывать очередь? Например,следующим образом:

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

1 Ответ

0 голосов
/ 05 мая 2019

Я не видел его IRL, но из кода похоже, что возвращенные дескрипторы файлов не содержат дескриптор файла results_queue.

из concurrent.futures.process:

    reader = result_queue._reader

    while True:
        _add_call_item_to_queue(pending_work_items,
                                work_ids_queue,
                                call_queue)

        sentinels = [p.sentinel for p in processes.values()]
        assert sentinels
        ready = wait([reader] + sentinels)
        if reader in ready:  # <===================================== THIS
            result_item = reader.recv()
        else:
            # Mark the process pool broken so that submits fail right now.
            executor = executor_reference()
            if executor is not None:
                executor._broken = True
                executor._shutdown_thread = True
                executor = None
            # All futures in flight must be marked failed
            for work_id, work_item in pending_work_items.items():
                work_item.future.set_exception(
                    BrokenProcessPool(
                        "A process in the process pool was "
                        "terminated abruptly while the future was "
                        "running or pending."
                    ))
                # Delete references to object. See issue16284
                del work_item

функция wait зависит от системы, но при условии, что ОС Linux (на multiprocessing.connection удален весь код, связанный с тайм-аутом):

    def wait(object_list, timeout=None):
        '''
        Wait till an object in object_list is ready/readable.

        Returns list of those objects in object_list which are ready/readable.
        '''
        with _WaitSelector() as selector:
            for obj in object_list:
                selector.register(obj, selectors.EVENT_READ)

            while True:
                ready = selector.select(timeout)
                if ready:
                    return [key.fileobj for (key, events) in ready]
                else:
                    # some timeout code

...