Пул потоков Python multiprocessing.dummy выполняет больше задач с `map` без инициализации потока - PullRequest
0 голосов
/ 25 сентября 2018

У меня есть следующий код.Я думаю, что инициализатор запускается до выполнения задач, но, очевидно, я получаю ошибки, указывающие на то, что некоторые задачи выполняются без инициализации этого потока.

import threading
import random
from multiprocessing.dummy import Pool, Value, Queue, Manager

def init_worker():
    global thread_local
    thread_local = threading.local()
    thread_local.worker_idx = random.randint(0, 10)
    print("++++++++++++++++++++++++ worker %s" %  thread_local.worker_idx)


def run(idx):
    print(dir(thread_local))
    worker_idx = thread_local.worker_idx
    print("==================== TASK ID %s by worker %s ====================" % (idx, worker_idx))


pool = Pool(2, init_worker)
pool.map(run, range(10), chunksize=1)

Вывод:

++++++++++++++++++++++++ worker 1
++++++++++++++++++++++++ worker 7
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 0 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 2 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
==================== TASK ID 3 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 4 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 5 by worker 7 ====================
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'worker_idx']
==================== TASK ID 7 by worker 7 ====================
Traceback (most recent call last):
  File "test.py", line 19, in <module>
    pool.map(run, range(10), chunksize=1)
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "test.py", line 14, in run
    worker_idx = thread_local.worker_idx
AttributeError: '_thread._local' object has no attribute 'worker_idx'

Похоже, что оба потока правильно инициализированы, но больше задач было запущено без предварительного запуска инициализатора.Вывод print(dir(thread_local)) очень противоречив.

1 Ответ

0 голосов
/ 25 сентября 2018

Похоже, проблема в инициализаторе.Обратите внимание, что нет распечаток TASK ID ... by worker 1, хотя было ясно, что поток создал локальный объект потока global и назначен атрибуту worker_idx.Это связано с тем, что оба потока пытаются создать локальный поток global , а поток 7 перезаписал локальный объект потока, созданный потоком 1 (а не worker_idx attr), и таким образом уничтожилworker_idx для потока 1. Вместо этого попробуйте создать глобал в главном потоке (тот, который вызывает map).И только присваивайте worker_idx в инициализаторе потока.

...