Понимание того, как использовать многопроцессорные пулы - PullRequest
0 голосов
/ 29 мая 2019

Я надеюсь, что кто-нибудь сможет мне помочь с некоторыми вопросами о многопроцессорности, и если я использую правильный подход к тому, что я пытаюсь сделать.

Вот пример кода того, что я уже написал, который, я думаю, показывает то, что я пытаюсь выполнить:

import multiprocessing
import time


# Example Class that does some math things
class DoMath(object):
    def __init__(self, total):
        self.total = total

    def add(self, a, b):
        print("Addition: ", int(a+b))
        return int(a+b)

    def sub(self, a, b):
        print("Subtraction: ", int(a - b))
        return int(a-b)

    def totals(self, add, minus):
        self.total += (add + minus)
        print("Current Total: ", self.total)
        return int(self.total)


# Pool worker doing simple tasks, sleep is there so it doesn't complete instantly
def worker(a, b):
    time.sleep(1)
    add = values.add(a, b)
    sub = values.sub(a, b)
    values.totals(add, sub)
    return values


# Call back function
def callback(x):
    print('worker done')
    print(x.total)
    return x


if __name__ == '__main__':
    math_numbers = {"1": {"a": 1, "b": 1}, "2": {"a": 2, "b": 2}}

    values = DoMath(0)

    pool_size = 1
    pool = multiprocessing.Pool(
        processes=pool_size
    )

    for key, params_dict in math_numbers.items():
        test = pool.apply_async(worker, args=tuple(), kwds=params_dict, callback=callback)

    pool.close()
    pool.join()

    print(test)
    print(values.total)

Когда я запускаю это, я получаю следующие результаты:

Addition:  2
Subtraction:  0
Current Total:  2
worker done
2
Addition:  4
Subtraction:  0
Current Total:  6
worker done
6
<multiprocessing.pool.ApplyResult object at 0x104c5a0b8>
0

Мой первый вопрос вращается вокруг того, как я создал экземпляр класса DoMath в строке 43. Кажется, что запуск класса, как это работает, но моя проблема связана с моим реальным кодом, все не в одном файле, и у меня нет удалось выяснить, как передать класс в функцию apply_async. Попытка вставить это в JSON DICT, похоже, не работает для меня. Мой настоящий код - это создание пула соединений, чтобы я мог отправлять запросы REST к API, в настоящее время единственный способ, которым я смог заставить его работать, - это создание нового соединения для каждого работника. Это кажется крайне неэффективным, есть ли лучший способ приблизиться к этому? Вот пример того, что я имею против того, что я хотел бы сделать:

Ток:

def worker(arg1, arg2, arg3):
    connection = connect(
        url=arg1,
        port=arg2
    )
    connection.api(do_something=arg3)

Что бы я хотел сделать:

def worker(arg3, connection):
    connection.api(do_something=arg3)

Во-вторых, и это может быть фундаментальным недоразумением, но есть ли способ обработать результаты этих рабочих до полного завершения пула? С моим реальным кодом у меня работает более 15000 рабочих, и каждый из них вернет список длиной примерно 10 тыс. Элементов. Я хотел бы, чтобы они были записаны в файл, но вместо того, чтобы создавать более 15 000 файлов, я хотел бы, чтобы процесс добавлял их в файлы, переворачивая на новый каждые 100 тыс. Строк. Я понимаю, что могу заставить каждого работника просто записать свои выходные данные в файл, либо внутри работника, либо в обратном вызове, но я беспокоюсь, что если я попытаюсь выполнить процесс опрокидывания, когда несколько работников записывают один и тот же файл одновременно, данные могут быть потеряны или неправильно переданы.

В строках 56 и 57 первого блока кода, которым я поделился, я немного запутался в этих результатах. Я не совсем уверен, как я могу использовать объект ApplyResult или что он должен содержать. Более того, я действительно не понимаю, почему объект values.total за пределами рабочих возвращает 0, когда и пулы, и обратные вызовы возвращают фактическое значение шесть. Что здесь происходит?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...