Как вернуть первый аргумент потока вместе с возвратом потока из очереди? - PullRequest
0 голосов
/ 24 апреля 2018

Ниже у меня есть класс Threader, который я использую для создания потока произвольного числа функций, а затем возвращаю список возвратов потоковых функций после присоединения к потокам.Одна функция, которую я хочу, это возможность вернуть словарь вместо списка.Я нашел один метод для этого, требуя, чтобы многопоточные функции возвращали кортеж.Первое значение кортежа будет затем использовано для ключа.Вместо этого я хочу иметь его так, чтобы в качестве ключа использовался первый аргумент потоковой функции.

Я узнал, что потокам можно присвоить имена, поэтому я установил имена, которые должны быть заданы в качестве первого аргумента функции потоковна создание потока.Сам поток может получить доступ к имени с помощью getName (), но как мне получить имя потока, следующего в строке, чтобы быть .get () из очереди?(Как мне получить доступ к объектам потока в очереди?)

Мне просто нужно, чтобы он работал, как описано в первом абзаце, поэтому я открыт для альтернативных методов достижения того же эффекта.

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        thread_queue (Queue): The queue that holds the threads.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.thread_queue = Queue()
        self.threads = []


    def add_thread(self, func, args):
        """add a function to be threaded"""
        self.threads.append(Thread(
            name=args[0], # Custom name using function's first argument
            target=lambda queue, func_args: queue.put(func(*func_args)),
            args=(self.thread_queue, args)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """

        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.thread_queue.empty():
                # Setting the dictionary key with returned tuple
                # How to access thread's name?
                key, value = self.thread_queue.get()
                results[key] = value
        else:
            results = []
            while not self.thread_queue.empty():
                results.append(self.thread_queue.get())

        return results

Пример использования:

threader = Threader()
for region in regions:
    # probe_region is a function, and (region, tag_filter) are args for it
    threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()

Редактировать: Что я сейчас использую:

Моя очищенная и улучшенная версия Ответ Маккея (возврат отсортирован повставка резьбы):

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        result_queue (Queue): Thread-safe queue that holds the results.
        threads (list[Thread]): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.result_queue = Queue()
        self.threads = []


    def worker(self, func, fargs):
        """insert threaded function into queue to make its return retrievable

        The index of the thread and the threaded function's first arg are 
        inserted into the queue, preceding the threaded function itself.

        Args: See add_thread
        """
        return self.result_queue.put([
            len(self.threads), fargs[0], func(*fargs)])


    def add_thread(self, func, fargs):
        """add a function to be threaded

        Args:
            func (function): Function to thread.
            fargs (tuple): Argument(s) to pass to the func function.

        Raises:
            ValueError: If func isn't callable, or if fargs not a tuple.
        """

        if not callable(func):
            raise ValueError("func must be a function.")
        if not isinstance(fargs, tuple) or not fargs:
            raise ValueError("fargs must be a non-empty tuple.")

        self.threads.append(Thread(target=self.worker, args=(func, fargs)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block all threads, sort by thread index, then return thread results

        Args:
            return_dict (bool): Return dict instead of list. Threads' 
                function's first argument used as key.
        """

        for thread in self.threads:
            thread.join()

        thread_data = []
        while not self.result_queue.empty():
            thread_data.append(self.result_queue.get())
        thread_data.sort(key=lambda thread_index: thread_index[0])

        if return_dict:
            results = {}
            for _, key, thread_return in thread_data:
                results[key] = thread_return
        else:
            results = []
            for _, _, thread_return in thread_data:
                results.append(thread_return)

        return results

1 Ответ

0 голосов
/ 25 апреля 2018

Если вы хотите получить только результат, описанный в первом абзаце, где вы используете первый аргумент в качестве ключа, вы можете изменить свой код так, чтобы он делал это так:

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        queue (Queue): The thread-safe queue that holds the results.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.results_queue = Queue()
        self.threads = []

    def worker(self, func, args):
        """run the function and save its results"""
        result = func(*args)
        # save result, along with a key for use later if needed (first argument)
        self.results_queue.put([args[0], result])

    def add_thread(self, func, fargs):
        """add a function to be threaded"""
        self.threads.append(Thread(target = self.worker, args = (func, fargs)))
        self.threads[-1].start()

    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """
        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results[key] = value
        else:
            results = []
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results.append(value)

        return results

NB это не такНужно хранить сами потоки в очереди, только результаты.(Очередь - хороший выбор для хранения результатов, поскольку она решает проблемы синхронизации доступа.)

В функции worker() вы можете генерировать ключ так, как вам нравится;в приведенном выше коде я использовал первый аргумент, как вы предложили.

Пример использования:

def foo(*args):
    return "foo() " + repr(len(args))

def bar(*args):
    return "bar() " + repr(len(args))

def baz(*args):
    return "baz() " + repr(len(args))

threader = Threader()

threader.add_thread(foo, ["foo_key", "a"])
threader.add_thread(bar, ["bar_key", "b", "c"])
threader.add_thread(baz, ["baz_key", "d", "e", "f"])

print (threader.get_results(True))

Это дает вывод:

{'foo_key': 'foo() 2', 'bar_key': 'bar() 3', 'baz_key': 'baz() 4'}

Надеюсь, это поможет.

...