У меня есть следующий код.Я думаю, что инициализатор запускается до выполнения задач, но, очевидно, я получаю ошибки, указывающие на то, что некоторые задачи выполняются без инициализации этого потока.
import threading
import random
from multiprocessing.dummy import Pool, Value, Queue, Manager
def init_worker():
global thread_local
thread_local = threading.local()
thread_local.worker_idx = random.randint(0, 10)
print("++++++++++++++++++++++++ worker %s" % thread_local.worker_idx)
def run(idx):
print(dir(thread_local))
worker_idx = thread_local.worker_idx
print("==================== TASK ID %s by worker %s ====================" % (idx, worker_idx))
pool = Pool(2, init_worker)
pool.map(run, range(10), chunksize=1)
Вывод:
++++++++++++++++++++++++ worker 1
++++++++++++++++++++++++ worker 7
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 0 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 2 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
==================== TASK ID 3 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 4 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 5 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 7 by worker 7 ====================
Traceback (most recent call last):
File "test.py", line 19, in <module>
pool.map(run, range(10), chunksize=1)
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "test.py", line 14, in run
worker_idx = thread_local.worker_idx
AttributeError: '_thread._local' object has no attribute 'worker_idx'
Похоже, что оба потока правильно инициализированы, но больше задач было запущено без предварительного запуска инициализатора.Вывод print(dir(thread_local))
очень противоречив.