Прерывания клавиатуры с помощью многопроцессорного пула Python - PullRequest
117 голосов
/ 11 сентября 2009

Как я могу обрабатывать события KeyboardInterrupt с помощью многопроцессорных пулов Python? Вот простой пример:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

При запуске кода выше, KeyboardInterrupt поднимается, когда я нажимаю ^C, но процесс просто зависает в этой точке, и мне приходится убивать его извне.

Я хочу иметь возможность нажать ^C в любое время и заставить все процессы завершиться корректно.

Ответы [ 10 ]

130 голосов
/ 11 сентября 2009

Это ошибка Python. При ожидании условия в threading.Condition.wait () KeyboardInterrupt никогда не отправляется. Репро:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

Исключение KeyboardInterrupt не будет доставлено до тех пор, пока wait () не вернется, и никогда не вернется, поэтому прерывание никогда не происходит. KeyboardInterrupt почти наверняка прервет ожидание условия.

Обратите внимание, что это не происходит, если указан тайм-аут; cond.wait (1) немедленно получит прерывание. Таким образом, обходной путь должен указать время ожидания. Для этого замените

    results = pool.map(slowly_square, range(40))

с

    results = pool.map_async(slowly_square, range(40)).get(9999999)

или аналогичный.

46 голосов
/ 31 мая 2011

Из того, что я недавно обнаружил, лучшее решение - настроить рабочие процессы на полное игнорирование SIGINT и ограничить весь код очистки родительским процессом. Это устраняет проблему как для незанятых, так и для занятых рабочих процессов и не требует кода обработки ошибок в ваших дочерних процессах.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Объяснение и полный пример кода можно найти в http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ и http://github.com/jreese/multiprocessing-keyboardinterrupt соответственно.

25 голосов
/ 01 апреля 2010

По некоторым причинам, только исключения, унаследованные от базового класса Exception, обрабатываются нормально. В качестве обходного пути вы можете повторно поднять свой KeyboardInterrupt как экземпляр Exception:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Обычно вы получите следующий вывод:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Итак, если вы нажмете ^C, вы получите:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
7 голосов
/ 31 октября 2012

Обычно эта простая структура работает для Ctrl - C для пула:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Как было сказано в нескольких похожих постах:

Захват прерывания клавиатуры в Python без попытки-исключая

5 голосов
/ 15 мая 2014

Кажется, есть две проблемы, которые делают исключения, когда раздражает многопроцессорность. Первый (отмеченный Гленном) заключается в том, что вам нужно использовать map_async с таймаутом вместо map, чтобы получить немедленный ответ (т.е. не заканчивать обработку всего списка). Второе (замечено Андреем) состоит в том, что многопроцессорная обработка не захватывает исключения, которые не наследуются от Exception (например, SystemExit). Итак, вот мое решение, которое имеет дело с обоими из них:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    /914573/preryvaniya-klaviatury-s-pomoschy-mnogoprotsessornogo-pula-python

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
4 голосов
/ 02 июля 2017

Голосованный ответ не затрагивает основную проблему, но аналогичный побочный эффект.

Джесси Ноллер, автор многопроцессорной библиотеки, объясняет, как правильно обращаться с CTRL + C при использовании multiprocessing.Pool в старом блоге .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
3 голосов
/ 26 августа 2010

Я обнаружил, что на данный момент лучшее решение - не использовать функцию multiprocessing.pool, а вместо этого использовать собственную функцию пула. Я привел пример, демонстрирующий ошибку с apply_async, а также пример, показывающий, как вообще не использовать функциональность пула.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

1 голос
/ 15 мая 2017

Я новичок в Python. Я всюду искал ответ и наткнулся на этот и несколько других блогов и видео на YouTube. Я попытался скопировать вставить код автора выше и воспроизвести его на моем Python 2.7.13 в Windows 7 64-битной. Это близко к тому, чего я хочу достичь.

Я заставил свои дочерние процессы игнорировать ControlC и заставить родительский процесс завершаться. Похоже, обход дочернего процесса действительно помогает мне избежать этой проблемы.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

Часть, начинающаяся с pool.terminate(), кажется, никогда не выполняется.

0 голосов
/ 14 августа 2018

Вы можете попробовать использовать метод apply_async объекта Pool, например:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Выход:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Преимущество этого метода в том, что результаты, обработанные до прерывания, будут возвращены в словарь результатов:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
0 голосов
/ 11 сентября 2009

Как ни странно, похоже, что вы должны справиться с KeyboardInterrupt и у детей. Я ожидал бы, что это будет работать как написано ... попробуйте изменить slowly_square на:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Это должно работать, как вы ожидали.

...