Когда я учился использовать SyncManager для совместного использования общего объекта несколькими процессами, я заметил странное поведение Thread , работающего в контексте SyncManager .
Насколько я понимаю, SyncManager создает процесс сервера, принимая соединения от других процессов, а затем вызывает целевой метод, используя Thread . (https://github.com/python/cpython/blob/master/Lib/multiprocessing/managers.py#L184)
Однако в моем примере кода ниже идентификатор потока, получаемый путем вызова threading.get_ident()
, возвращает то же значение.
Я добавил RLock версия вызванного метода вместе с неблокированной версией. Из того, что я вижу по временной шкале журнала, он ведет себя как ожидалось - разные потоки вызывают метод cube
, но как получается один и тот же идентификатор потока?
Я понимаю, что такой же идентификатор потока может возникать в другом потоке после завершения последнего потока. Но это не объясняет поведение без RLock .
from multiprocessing import Process
from multiprocessing.managers import SyncManager
import time
import os
import logging
import threading
from threading import RLock
class MyManager(SyncManager):
pass
class CommonObj:
def __init__(self):
self.logger = logging.getLogger(
"COMMON-%d-%d" % (os.getpid(), threading.get_ident())
)
self.lock = RLock()
def cube_lock(self, x):
with self.lock:
self.logger.info("lock acquired")
time.sleep(3)
result = x * x * x
self.logger.info("lock released")
return result
def cube(self, x):
self.logger.info("cube called")
time.sleep(3)
result = x * x * x
self.logger.info("cube finish")
return result
def worker(cube):
logger = logging.getLogger("WORKER-%d" % os.getpid())
logger.info("calling common_obj.cube")
result = cube(2)
logger.info("result is %d" % result)
def main():
logging.basicConfig(level=logging.INFO)
MyManager.register("CommonObj", CommonObj)
processes = list()
with MyManager() as manager:
common_obj = manager.CommonObj()
print("Calling cube without RLock:")
for i in range(3):
p = Process(target=worker, args=(common_obj.cube,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("joined %r" % p)
print("\n\nCalling cube with RLock:")
processes.clear()
for i in range(3):
p = Process(target=worker, args=(common_obj.cube_lock,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("joined %r" % p)
if __name__ == "__main__":
main()
Calling cube without RLock:
INFO:WORKER-1389:calling common_obj.cube
INFO:WORKER-1390:calling common_obj.cube
INFO:COMMON-1388-123145579134976:cube called
INFO:COMMON-1388-123145579134976:cube called
INFO:WORKER-1391:calling common_obj.cube
INFO:COMMON-1388-123145579134976:cube called
INFO:COMMON-1388-123145579134976:cube finish
INFO:COMMON-1388-123145579134976:cube finish
INFO:WORKER-1389:result is 8
INFO:COMMON-1388-123145579134976:cube finish
INFO:WORKER-1391:result is 8
INFO:WORKER-1390:result is 8
joined <Process(Process-2, stopped)>
joined <Process(Process-3, stopped)>
joined <Process(Process-4, stopped)>
Calling cube with RLock:
INFO:WORKER-1394:calling common_obj.cube
INFO:COMMON-1388-123145579134976:lock acquired
INFO:WORKER-1395:calling common_obj.cube
INFO:WORKER-1396:calling common_obj.cube
INFO:COMMON-1388-123145579134976:lock released
INFO:COMMON-1388-123145579134976:lock acquired
INFO:WORKER-1394:result is 8
joined <Process(Process-5, stopped)>
INFO:COMMON-1388-123145579134976:lock released
INFO:COMMON-1388-123145579134976:lock acquired
INFO:WORKER-1395:result is 8
joined <Process(Process-6, stopped)>
INFO:COMMON-1388-123145579134976:lock released
INFO:WORKER-1396:result is 8
joined <Process(Process-7, stopped)>