Ваш код работает, но он не так прост, как мог бы быть. Давайте пройдемся по коду.
Это создает Call
экземпляр в основном процессе:
def delegate(func, *args, **kwargs):
cb = Call.create()
но когда вы передаете cb
рабочему процессу,
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
экземпляр Call
копируется во время os.fork
, создавая тем самым второй отдельный экземпляр. Тогда
вызывает cb.done
и вызывает cb._replace
, который возвращает третий Call
экземпляр:
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
Выше приведен закрытый метод namedtuple _replace
. Это могло быть
простые выражения Python, такие как
self.finished = -1 if error else 1
, если Call
был подклассом object
вместо подкласса namedtuple
. Подклассы namedtuple
немного сэкономили при наборе __init__
, но позже это становится довольно неуклюжим, поскольку нам нужно изменить атрибуты namedtuple
...
Между тем, исходный экземпляр Call
, возвращаемый delegate(...)
в вызовах основного процесса, attach
:
delegate(...).attach(on_sleeper_result)
Это изменяет глобальный _CALLBACKS
дикт. Рабочие процессы не могут знать об этом изменении в _CALLBACKS
; в рабочих процессах _CALLBACKS
все еще пустой диктат. Таким образом, вы должны передать экземпляр Call
работника обратно в основной процесс через mp.Queue
, который использует cb.id
для ссылки на нужные функции в _CALLBACKS
.
Так что все это работает, но создает три Call
экземпляра для каждого вызова delegate
, и код может ввести непосвященных в заблуждение, полагая, что все три Call
экземпляра - это один и тот же объект .... Все это работает, но это довольно сложно.
Рассматривали ли вы вместо этого параметр mp.Pool.apply_async
callback
?
import multiprocessing as mp
import logging
import time
import collections
_CALLBACKS=collections.defaultdict(list)
logger=mp.log_to_stderr(logging.DEBUG)
def attach(name,func):
_CALLBACKS[name].append(func)
def delegate(func, *args, **kwargs):
id=kwargs.pop('id')
try:
result=func(*args,**kwargs)
except Exception, err:
result=err
return (id,result)
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
logger.info('sleeper: will go to sleep for %s secs' % secs)
time.sleep(secs)
logger.info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def callback(r):
id,result=r
for func in _CALLBACKS[id]:
func(result)
def on_sleeper_result(r):
if isinstance(r, Exception):
logger.error('on_sleeper_result: got error: %s' % r)
else:
logger.info('on_sleeper_result: got result: %s' % r)
if __name__=='__main__':
pool=mp.Pool()
pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1},
callback=callback)
attach(1,on_sleeper_result)
pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2},
callback=callback)
attach(2,on_sleeper_result)
while 1:
logger.info('main: loop')
time.sleep(1)