Извините заранее, это будет долго ...
Возможно связано:
Ошибка многопроцессорной обработки Python atexit «Ошибка в atexit._run_exitfuncs»
Определенно связаны:
параллельная карта python (multiprocessing.Pool.map) с глобальными данными
Прерывания клавиатуры с помощью многопроцессорного пула Python
Вот "простой" сценарий, который я взломал вместе, чтобы проиллюстрировать мою проблему ...
import time
import multiprocessing as multi
import atexit
cleanup_stuff=multi.Manager().list([])
##################################################
# Some code to allow keyboard interrupts
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
"""
Toy class to allow retrieval of the interrupt that triggered it's execution
"""
def __init__(self,interrupt):
self.interrupt=interrupt
def interrupt():
was_interrupted.append(1)
def interruptable(func):
"""
decorator to allow functions to be "interruptable" by
a keyboard interrupt when in python's multiprocessing.Pool.map
**Note**, this won't actually cause the Map to be interrupted,
It will merely cause the following functions to be not executed.
"""
def newfunc(*args,**kwargs):
try:
if(not was_interrupted):
return func(*args,**kwargs)
else:
return False
except KeyboardInterrupt as e:
interrupt()
return _interrupt(e) #If we really want to know about the interrupt...
return newfunc
@atexit.register
def cleanup():
for i in cleanup_stuff:
print(i)
return
@interruptable
def func(i):
print(i)
cleanup_stuff.append(i)
time.sleep(float(i)/10.)
return i
#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
return func(*args)
if __name__ == "__main__":
#This is an attempt to use signals -- I also attempted something similar where
#The signals were only caught in the child processes...Or only on the main process...
#
#import signal
#def onSigInt(*args): interrupt()
#signal.signal(signal.SIGINT,onSigInt)
#Try 2 with signals (only catch signal on main process)
#import signal
#def onSigInt(*args): interrupt()
#signal.signal(signal.SIGINT,onSigInt)
#def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
#p=multi.Pool(processes=4,initializer=startup)
#Try 3 with signals (only catch signal on child processes)
#import signal
#def onSigInt(*args): interrupt()
#signal.signal(signal.SIGINT,signal.SIG_IGN)
#def startup(): signal.signal(signal.SIGINT,onSigInt)
#p=multi.Pool(processes=4,initializer=startup)
p=multi.Pool(4)
try:
out=p.map(wrapper,range(30))
#out=p.map_async(wrapper,range(30)).get() #This doesn't work either...
#The following lines don't work either
#Effectively trying to roll my own p.map() with p.apply_async
# results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
# out = [ r.get() for r in results() ]
except KeyboardInterrupt:
print ("Hello!")
out=None
finally:
p.terminate()
p.join()
print (out)
Это работает очень хорошо, если не вызвано KeyboardInterrupt. Однако, если я подниму один, произойдет следующее исключение:
10
7
9
12
^CHello!
None
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "test.py", line 58, in cleanup
for i in cleanup_stuff:
File "<string>", line 2, in __getitem__
File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
self._connect()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
c = SocketClient(address)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
s.connect(address)
File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
Error in sys.exitfunc:
Traceback (most recent call last):
File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "test.py", line 58, in cleanup
for i in cleanup_stuff:
File "<string>", line 2, in __getitem__
File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
self._connect()
File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
c = SocketClient(address)
File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
s.connect(address)
File "<string>", line 1, in connect
socket.error: [Errno 2] No such file or directory
Интересно, что код выходит из функции Pool.map без вызова каких-либо дополнительных функций ... Проблема, похоже, заключается в том, что KeyboardInterrupt не обрабатывается должным образом в какой-то момент, но это немного сбивает с толку, когда это есть, и почему это не обрабатывается непрерывно. Спасибо.
Обратите внимание, та же проблема возникает, если я использую out=p.map_async(wrapper,range(30)).get()
РЕДАКТИРОВАТЬ 1
Немного ближе ... Если я добавлю out=p.map(...)
в предложение try,except,finally
, это избавит от первого исключения ... однако остальные все еще возникают в atexit. Код и трассировка выше были обновлены.
РЕДАКТИРОВАТЬ 2
Что-то еще, что не работает, было добавлено в код выше в качестве комментария. (Та же ошибка). Эта попытка была вдохновлена:
http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/
РЕДАКТИРОВАТЬ 3
Еще одна неудачная попытка с использованием сигналов, добавленных к коду выше.
РЕДАКТИРОВАТЬ 4
Я выяснил, как реструктурировать мой код, чтобы вышеописанное больше не требовалось. В (маловероятном) случае, когда кто-то наткнется на этот поток с тем же сценарием использования, который у меня был, я опишу свое решение ...
Вариант использования
У меня есть функция, которая генерирует временные файлы с помощью модуля tempfile
. Я хотел бы, чтобы эти временные файлы были очищены при выходе из программы. Первоначально я пытался упаковать каждое временное имя файла в список, а затем удалить все элементы списка с помощью функции, зарегистрированной через atexit.register
. Проблема в том, что обновленный список не обновлялся в нескольких процессах. Именно здесь у меня появилась идея использовать multiprocessing.Manager
для управления данными списка. К сожалению, это терпит неудачу на KeyboardInterrupt
, как бы я ни старался, потому что сокеты связи между процессами по какой-то причине были сломаны. Решение этой проблемы простое. Перед использованием многопроцессорной обработки установите временный каталог файлов ... что-то вроде tempfile.tempdir=tempfile.mkdtemp()
, а затем зарегистрируйте функцию для удаления временного каталога. Каждый из процессов пишет в один и тот же временный каталог, поэтому он работает. Конечно, это решение работает только тогда, когда общие данные представляют собой список файлов, которые необходимо удалить в конце срока службы программы.