Ошибка с многопроцессорной обработкой, atexit и глобальными данными - PullRequest
4 голосов
/ 29 марта 2012

Извините заранее, это будет долго ...

Возможно связано:

Ошибка многопроцессорной обработки Python atexit «Ошибка в atexit._run_exitfuncs»

Определенно связаны:

параллельная карта python (multiprocessing.Pool.map) с глобальными данными

Прерывания клавиатуры с помощью многопроцессорного пула Python

Вот "простой" сценарий, который я взломал вместе, чтобы проиллюстрировать мою проблему ...

import time
import multiprocessing as multi
import atexit

cleanup_stuff=multi.Manager().list([])

##################################################
# Some code to allow keyboard interrupts  
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
    """
    Toy class to allow retrieval of the interrupt that triggered it's execution
    """
    def __init__(self,interrupt):
        self.interrupt=interrupt

def interrupt():
    was_interrupted.append(1)

def interruptable(func):
    """
    decorator to allow functions to be "interruptable" by
    a keyboard interrupt when in python's multiprocessing.Pool.map
    **Note**, this won't actually cause the Map to be interrupted,
    It will merely cause the following functions to be not executed.
    """
    def newfunc(*args,**kwargs):
        try:
            if(not was_interrupted):
                return func(*args,**kwargs)
            else:
                return False
        except KeyboardInterrupt as e:
            interrupt()
            return _interrupt(e)  #If we really want to know about the interrupt...
    return newfunc

@atexit.register
def cleanup():
    for i in cleanup_stuff:
        print(i)
    return

@interruptable
def func(i):
    print(i)
    cleanup_stuff.append(i)
    time.sleep(float(i)/10.)
    return i

#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
    return func(*args)


if __name__ == "__main__":

    #This is an attempt to use signals -- I also attempted something similar where
    #The signals were only caught in the child processes...Or only on the main process...
    #
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)

    #Try 2 with signals (only catch signal on main process)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)
    #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
    #p=multi.Pool(processes=4,initializer=startup)

    #Try 3 with signals (only catch signal on child processes)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,signal.SIG_IGN)
    #def startup(): signal.signal(signal.SIGINT,onSigInt)
    #p=multi.Pool(processes=4,initializer=startup)


    p=multi.Pool(4)
    try:
        out=p.map(wrapper,range(30))
        #out=p.map_async(wrapper,range(30)).get()  #This doesn't work either...

        #The following lines don't work either
        #Effectively trying to roll my own p.map() with p.apply_async 
        # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
        # out = [ r.get() for r in results() ]
    except KeyboardInterrupt:
        print ("Hello!")
        out=None
    finally:
        p.terminate()
        p.join()

    print (out)

Это работает очень хорошо, если не вызвано KeyboardInterrupt. Однако, если я подниму один, произойдет следующее исключение:

10
7
9
12
^CHello!
None
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
   c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
   s.connect(address)
  File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
    s.connect(address)
  File "<string>", line 1, in connect
socket.error: [Errno 2] No such file or directory

Интересно, что код выходит из функции Pool.map без вызова каких-либо дополнительных функций ... Проблема, похоже, заключается в том, что KeyboardInterrupt не обрабатывается должным образом в какой-то момент, но это немного сбивает с толку, когда это есть, и почему это не обрабатывается непрерывно. Спасибо.

Обратите внимание, та же проблема возникает, если я использую out=p.map_async(wrapper,range(30)).get()

РЕДАКТИРОВАТЬ 1

Немного ближе ... Если я добавлю out=p.map(...) в предложение try,except,finally, это избавит от первого исключения ... однако остальные все еще возникают в atexit. Код и трассировка выше были обновлены.

РЕДАКТИРОВАТЬ 2

Что-то еще, что не работает, было добавлено в код выше в качестве комментария. (Та же ошибка). Эта попытка была вдохновлена:

http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/

РЕДАКТИРОВАТЬ 3

Еще одна неудачная попытка с использованием сигналов, добавленных к коду выше.

РЕДАКТИРОВАТЬ 4

Я выяснил, как реструктурировать мой код, чтобы вышеописанное больше не требовалось. В (маловероятном) случае, когда кто-то наткнется на этот поток с тем же сценарием использования, который у меня был, я опишу свое решение ...

Вариант использования

У меня есть функция, которая генерирует временные файлы с помощью модуля tempfile. Я хотел бы, чтобы эти временные файлы были очищены при выходе из программы. Первоначально я пытался упаковать каждое временное имя файла в список, а затем удалить все элементы списка с помощью функции, зарегистрированной через atexit.register. Проблема в том, что обновленный список не обновлялся в нескольких процессах. Именно здесь у меня появилась идея использовать multiprocessing.Manager для управления данными списка. К сожалению, это терпит неудачу на KeyboardInterrupt, как бы я ни старался, потому что сокеты связи между процессами по какой-то причине были сломаны. Решение этой проблемы простое. Перед использованием многопроцессорной обработки установите временный каталог файлов ... что-то вроде tempfile.tempdir=tempfile.mkdtemp(), а затем зарегистрируйте функцию для удаления временного каталога. Каждый из процессов пишет в один и тот же временный каталог, поэтому он работает. Конечно, это решение работает только тогда, когда общие данные представляют собой список файлов, которые необходимо удалить в конце срока службы программы.

...