Ниже у меня есть класс Threader, который я использую для создания потока произвольного числа функций, а затем возвращаю список возвратов потоковых функций после присоединения к потокам.Одна функция, которую я хочу, это возможность вернуть словарь вместо списка.Я нашел один метод для этого, требуя, чтобы многопоточные функции возвращали кортеж.Первое значение кортежа будет затем использовано для ключа.Вместо этого я хочу иметь его так, чтобы в качестве ключа использовался первый аргумент потоковой функции.
Я узнал, что потокам можно присвоить имена, поэтому я установил имена, которые должны быть заданы в качестве первого аргумента функции потоковна создание потока.Сам поток может получить доступ к имени с помощью getName (), но как мне получить имя потока, следующего в строке, чтобы быть .get () из очереди?(Как мне получить доступ к объектам потока в очереди?)
Мне просто нужно, чтобы он работал, как описано в первом абзаце, поэтому я открыт для альтернативных методов достижения того же эффекта.
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
thread_queue (Queue): The queue that holds the threads.
threads (Thread list): Threads of functions added with add_thread.
"""
def __init__(self):
self.thread_queue = Queue()
self.threads = []
def add_thread(self, func, args):
"""add a function to be threaded"""
self.threads.append(Thread(
name=args[0], # Custom name using function's first argument
target=lambda queue, func_args: queue.put(func(*func_args)),
args=(self.thread_queue, args)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block threads until all are done, then return their results
Args:
return_dict (bool): Return a dict instead of a list. Requires
each thread to return a tuple with two values.
"""
for thread in self.threads:
thread.join()
if return_dict:
results = {}
while not self.thread_queue.empty():
# Setting the dictionary key with returned tuple
# How to access thread's name?
key, value = self.thread_queue.get()
results[key] = value
else:
results = []
while not self.thread_queue.empty():
results.append(self.thread_queue.get())
return results
Пример использования:
threader = Threader()
for region in regions:
# probe_region is a function, and (region, tag_filter) are args for it
threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()
Редактировать: Что я сейчас использую:
Моя очищенная и улучшенная версия Ответ Маккея (возврат отсортирован повставка резьбы):
from queue import Queue
from threading import Thread
class Threader(object):
"""thread arbitrary number of functions, then block when results wanted
Attributes:
result_queue (Queue): Thread-safe queue that holds the results.
threads (list[Thread]): Threads of functions added with add_thread.
"""
def __init__(self):
self.result_queue = Queue()
self.threads = []
def worker(self, func, fargs):
"""insert threaded function into queue to make its return retrievable
The index of the thread and the threaded function's first arg are
inserted into the queue, preceding the threaded function itself.
Args: See add_thread
"""
return self.result_queue.put([
len(self.threads), fargs[0], func(*fargs)])
def add_thread(self, func, fargs):
"""add a function to be threaded
Args:
func (function): Function to thread.
fargs (tuple): Argument(s) to pass to the func function.
Raises:
ValueError: If func isn't callable, or if fargs not a tuple.
"""
if not callable(func):
raise ValueError("func must be a function.")
if not isinstance(fargs, tuple) or not fargs:
raise ValueError("fargs must be a non-empty tuple.")
self.threads.append(Thread(target=self.worker, args=(func, fargs)))
self.threads[-1].start()
def get_results(self, return_dict=False):
"""block all threads, sort by thread index, then return thread results
Args:
return_dict (bool): Return dict instead of list. Threads'
function's first argument used as key.
"""
for thread in self.threads:
thread.join()
thread_data = []
while not self.result_queue.empty():
thread_data.append(self.result_queue.get())
thread_data.sort(key=lambda thread_index: thread_index[0])
if return_dict:
results = {}
for _, key, thread_return in thread_data:
results[key] = thread_return
else:
results = []
for _, _, thread_return in thread_data:
results.append(thread_return)
return results