Python: несколько аргументов в map_async во время __init__ - PullRequest
0 голосов
/ 23 октября 2019

Я пытаюсь написать универсальный класс, который будет обрабатывать многопоточные функции. Он прекрасно работает, когда у меня есть только один аргумент для передачи функции, но как бы я ни изменял ее, я не могу заставить ее работать с двумя аргументами.

Одна из моих проблем заключается в том, что я не могу контролироватьфункции, которые я вызываю, поэтому я не буду знать идентификаторы параметров в функциях, и я не могу изменить функции, чтобы изменить порядок параметров или чтобы функции имели дело с получением одной итерации вместо обработки отдельных параметров.

Я обыскал несколько страниц и почесал голову некоторое время, но для меня это не имеет смысла. Некоторые сайты, которые я прочитал:

Python для многопроцессорной обработки pool.map для нескольких аргументов

http://python.omics.wiki/multiprocessing_map/multiprocessing_partial_function_multiple_arguments

https://www.reddit.com/r/learnpython/comments/8f2p4d/call_a_function_with_multiple_arguments_with/

Я сократил свой код до простого примера, показанного ниже. Генерация блоков не является хорошим примером для многопоточности, но он подойдет для примера. В результате должны появиться несколько блоков разных размеров, созданных с помощью символа @.

Мне нужно немного отойти от своего кода и очистить голову, поэтому я решил попросить о помощи здесь, пока яЯ вдали от клавиатуры.

from multiprocessing import Pool

class Parallel():
    """
    Allows one function to be executed in several threads at the same time, each with it's own parameters

    Attributes:
        function: func
            Function to be executed in parallel
        parameter_list: list
            A list of values to be processed by the function
        thread_limit: int
            Limits the number of threads that can run at one time
        wait: bool
            Will wait for all the functions to complete if True
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """

        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        # self._x = self._pool.map_async(function, parameter_list, chunksize=1)  # can't multiply sequence by non-int of type 'tuple'

        self._x = self._pool.starmap_async(function, parameter_list, chunksize=1)  # box() takes from 1 to 2 positional arguments but 19 were given

        # self._x = self._pool.apply_async(function, parameter_list)  # can't multiply sequence by non-int of type 'tuple'

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"

    def get(self):
        """
        Read the data from the functions that executed. Will block if threads are still active.

        Returns:
            list: A list of results
        """

        self._x.wait()
        self._state = "done"

        return self._x.get()


def box(size: int, fill: str = "#"):
    """ 
    Returns a square of the specified size, consisting of the fill character
    """
    print("FILL:{}".format(fill))  # Debugging
    print("SIZE:{}".format(size))  #
    line = fill * size + "\n"
    a_box = line * size

    return a_box

if __name__ == '__main__':

    # Generate boxes from 2 to 20 characters in size, made of the "@" character
    box_sizes = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    fill_char = ("@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@", "@")

    p = Parallel(function=box, parameter_list=(box_sizes, fill_char), wait=True)  # How to write this line???

    results = p.get()

    for box in results:
        print(box)
        print("")

1 Ответ

0 голосов
/ 23 октября 2019

После просмотра этого краткого описания о Starmap, я думаю, я понимаю, где была моя проблема. Я пытался передать несколько параметров в виде двух списков вместо списка для каждой пары параметров.

Решение, которое я нашел, работает. Я создал второй класс для обработки нескольких параметров. Это код, включая несколько тестов, чтобы убедиться, что он работает.

from multiprocessing import Pool

class Parallel():
    """
    Allows one function to be executed in several threads at the same time, each with it's own single parameter
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """
        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.map_async(function, parameter_list,chunksize=1)

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"

    def get(self):
        """
        Read the data from the functions that executed. Will block if threads are still active.

        Returns:
            list: A list of results
        """
        self._x.wait()
        self._state = "done"

        return self._x.get()


class Parallel2(Parallel):
    """
    Allows one function to be executed in several threads at the same time, each with multiple parameters
    """

    def __init__(self, function, parameter_list, thread_limit=4, wait: bool = False):
        """
        The constructor for the Parallel2 class.

        :param function: Function to be executed in parallel
        :param parameter_list: A list of multiple values to be processed by the function
        :param thread_limit: The maximum number of threads that can run at once
        """
        # Create new thread to hold our jobs
        self._pool = Pool(processes=thread_limit)

        self._x = self._pool.starmap_async(function, parameter_list, chunksize=1)

        self._state = "busy"
        self._process_count = len(parameter_list)

        if wait:
            self._x.wait()
            self._state = "done"


def box(size: int, fill: str = "#"):
    """ 
    Returns a square of the specified size, consisting of the fill character

    :param size: Size of the box
    :param fill: Character used to make the box

    :return: The box in a string
    """
    line = fill * size + "\n"
    a_box = line * size

    return a_box


def just_box(size: int):
    """
    Return a square of the specified size, consisting of the "#" character

    :param size: Size of the box

    :return: The box in a string
    """
    return box(size, fill="#")


def repeat_names(infos):
    """
    Repeats a name a specified number of times
    :param infos: A list containing a name, and a number to repeat the name by

    :return: The name, repeated
    """
    name_str = ""
    for x in range(infos[1]):
        name_str += infos[0]

    return name_str


if __name__ == '__main__':

    # Generate boxes of multiple sizes, made of the "@" character
    box_sizes = (2, 5, 6, 9, 10, 13, 15, 16, 19, 20)
    fill_char = "@"

    param = []
    for box_size in box_sizes:
        param.append((box_size, fill_char))

    # Test passing multiple arguments to a function
    p1 = Parallel2(function=box, parameter_list=param, wait=True, thread_limit=15)

    results = p1.get()

    for box in results:
        print(box)
        print("")


    # Test passing a single argument to a function
    p2 = Parallel(function=just_box, parameter_list=box_sizes, wait=True, thread_limit=15)

    results = p2.get()

    for box in results:
        print(box)
        print("")


    # Test passing a tuple as a single argument to a function
    name_list = [("Bob", 3), ("Mary", 5), ("John", 2)]
    p3 = Parallel(function=repeat_names, parameter_list=name_list, wait=True)

    results = p3.get()
    for result in results:
        print(result)
    print("")


    # Test passing a list as a single argument to a function
    name_list = [["Bob", 3], ["Mary", 5], ["John", 2]]
    p3 = Parallel(function=repeat_names, parameter_list=name_list, wait=True)

    results = p3.get()
    for result in results:
        print(result)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...