Как именовать потоки индивидуально в multiprocessing.dummy.Pool? - PullRequest
0 голосов
/ 03 декабря 2018

Я хочу назвать потоки в multiprocessing.dummy.Pool, чтобы я мог просматривать все их имена при вызове threading.enumerate() из основного потока.Есть ли ключевое слово, которое я могу применить, когда звоню pool.apply_async, чтобы иметь возможность называть темы?Я бы скорее назвал их при создании, чем из функции tester, просто для чистоты.

Например, если у меня есть пример кода ниже:

import multiprocessing.dummy
from time import sleep
import threading

def tester():
    sleep(2)
    print("running \n")

def run_conc(number_of_threads, fxn):
    pool = multiprocessing.dummy.Pool(processes=number_of_threads)
    for thread in range(number_of_threads):
        pool.apply_async(tester)
    print(threading.enumerate(), "\n")
    pool.close()
    pool.join()

run_conc(3, tester)

Когда я запускаюесли я получаю вывод:

[<_MainThread(MainThread, started 140735632434048)>, <Thread(SockThread, started daemon 123145521917952)>, <DummyProcess(Thread-1, started daemon 123145527246848)>, <DummyProcess(Thread-2, started daemon 123145532502016)>, <DummyProcess(Thread-3, started daemon 123145537757184)>, <Thread(Thread-4, started daemon 123145543012352)>, <Thread(Thread-5, started daemon 123145548267520)>, <Thread(Thread-6, started daemon 123145553522688)>] 

running 
running 
running 

Я хочу иметь возможность назвать 3 фиктивных потока в этом списке потоков, чтобы я мог определить, какой из них является каким.Или, может быть, есть способ сделать это из concurrent.futures, который я должен использовать вместо этого?

Ответы [ 3 ]

0 голосов
/ 03 декабря 2018

Хорошо. Я обнаружил, что могу изменить имя потока из функции tester, присвоив значение threading.current_thread().name.Однако, если кто-нибудь знает, как задать имя потока при создании из строки pool.apply_async, это знание будет высоко ценится.

0 голосов
/ 03 декабря 2018

Рабочие потоки в пуле не создаются при вызове pool.apply_async или других методах пула, но уже раньше, когда вы создаете экземпляр пула.При вызовах методов пула используются существующие потоки в пуле.

Невозможно назвать потоки при инициализации без манипулирования источником.Возможны следующие варианты:

  • переименование рабочих потоков после пул готов к созданию экземпляра
  • внутреннее исправление пула обезьяны для принудительного применения определенного шаблона именования

Первый вариант прост в реализации, вы просто перебираете атрибут ._pool экземпляра пула и изменяете .name содержащихся в нем потоков:

from multiprocessing.pool import ThreadPool as Pool


if __name__ == '__main__':

    pool = Pool(4)
    print([w.name for w in pool._pool])
    # ['Thread-1', 'Thread-2', 'Thread-3', 'Thread-4']

    for w in pool._pool:
        w.name = w.name.replace('Thread', 'ThreadPoolWorker')

    print([w.name for w in pool._pool])
    # ['ThreadPoolWorker-1', 'ThreadPoolWorker-2', 'ThreadPoolWorker-3', 'ThreadPoolWorker-4']

    pool.close()
    pool.join()

Обратите внимание, что я использую multiprocessing.pool.ThreadPool здесь, просто чтобы соответствовать примеру во втором варианте ниже, так как multiprocessing.dummy.Pool является просто оберткой вокруг ThreadPool.


Для второго варианта этоможно было бы пропатчить фабричную функцию для рабочих потоков ThreadPool.Process с помощью обертки, расширив имя по умолчанию «Thread-% d» (% d заполнено значением счетчика) с более значимым именем, например «ThreadPoolWorker».'.

# threadpool.py
# Module patching the name of worker-threads within ThreadPool

__all__ = ['ThreadPool']

from functools import wraps
from multiprocessing.pool import ThreadPool


def rename_worker(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        w = fn(*args, **kwargs)
        w.name = w.name.replace('Thread', 'ThreadPoolWorker')
        return w
    return wrapper


ThreadPool.Process = staticmethod(rename_worker(ThreadPool.Process))

Использование:

from threadpool import ThreadPool as Pool


if __name__ == '__main__':

    pool = Pool(4)
    print([w.name for w in pool._pool])
    # ['ThreadPoolWorker-1', 'ThreadPoolWorker-2', 'ThreadPoolWorker-3', 'ThreadPoolWorker-4']
    pool.close()
    pool.join()
0 голосов
/ 03 декабря 2018

Если вы можете отредактировать имя потока, то следующий фрагмент поможет

from multiprocessing import Pool,Queue
import threading

thread_names = Queue()
num_process = 4
for e in ['A','B','C','D']:
    thread_names.put('Thread-{}'.format(e))

def initializer(q):
    thread_name = q.get()
    threading.current_thread().name = thread_name

if __name__ == '__main__':
    pool = Pool(num_process=4,initializer=initializer,initargs=(threadnames,))
...