Я надеюсь, что кто-нибудь сможет мне помочь с некоторыми вопросами о многопроцессорности, и если я использую правильный подход к тому, что я пытаюсь сделать.
Вот пример кода того, что я уже написал, который, я думаю, показывает то, что я пытаюсь выполнить:
import multiprocessing
import time
# Example Class that does some math things
class DoMath(object):
def __init__(self, total):
self.total = total
def add(self, a, b):
print("Addition: ", int(a+b))
return int(a+b)
def sub(self, a, b):
print("Subtraction: ", int(a - b))
return int(a-b)
def totals(self, add, minus):
self.total += (add + minus)
print("Current Total: ", self.total)
return int(self.total)
# Pool worker doing simple tasks, sleep is there so it doesn't complete instantly
def worker(a, b):
time.sleep(1)
add = values.add(a, b)
sub = values.sub(a, b)
values.totals(add, sub)
return values
# Call back function
def callback(x):
print('worker done')
print(x.total)
return x
if __name__ == '__main__':
math_numbers = {"1": {"a": 1, "b": 1}, "2": {"a": 2, "b": 2}}
values = DoMath(0)
pool_size = 1
pool = multiprocessing.Pool(
processes=pool_size
)
for key, params_dict in math_numbers.items():
test = pool.apply_async(worker, args=tuple(), kwds=params_dict, callback=callback)
pool.close()
pool.join()
print(test)
print(values.total)
Когда я запускаю это, я получаю следующие результаты:
Addition: 2
Subtraction: 0
Current Total: 2
worker done
2
Addition: 4
Subtraction: 0
Current Total: 6
worker done
6
<multiprocessing.pool.ApplyResult object at 0x104c5a0b8>
0
Мой первый вопрос вращается вокруг того, как я создал экземпляр класса DoMath
в строке 43. Кажется, что запуск класса, как это работает, но моя проблема связана с моим реальным кодом, все не в одном файле, и у меня нет удалось выяснить, как передать класс в функцию apply_async
. Попытка вставить это в JSON DICT, похоже, не работает для меня. Мой настоящий код - это создание пула соединений, чтобы я мог отправлять запросы REST к API, в настоящее время единственный способ, которым я смог заставить его работать, - это создание нового соединения для каждого работника. Это кажется крайне неэффективным, есть ли лучший способ приблизиться к этому? Вот пример того, что я имею против того, что я хотел бы сделать:
Ток:
def worker(arg1, arg2, arg3):
connection = connect(
url=arg1,
port=arg2
)
connection.api(do_something=arg3)
Что бы я хотел сделать:
def worker(arg3, connection):
connection.api(do_something=arg3)
Во-вторых, и это может быть фундаментальным недоразумением, но есть ли способ обработать результаты этих рабочих до полного завершения пула? С моим реальным кодом у меня работает более 15000 рабочих, и каждый из них вернет список длиной примерно 10 тыс. Элементов. Я хотел бы, чтобы они были записаны в файл, но вместо того, чтобы создавать более 15 000 файлов, я хотел бы, чтобы процесс добавлял их в файлы, переворачивая на новый каждые 100 тыс. Строк. Я понимаю, что могу заставить каждого работника просто записать свои выходные данные в файл, либо внутри работника, либо в обратном вызове, но я беспокоюсь, что если я попытаюсь выполнить процесс опрокидывания, когда несколько работников записывают один и тот же файл одновременно, данные могут быть потеряны или неправильно переданы.
В строках 56 и 57 первого блока кода, которым я поделился, я немного запутался в этих результатах. Я не совсем уверен, как я могу использовать объект ApplyResult
или что он должен содержать. Более того, я действительно не понимаю, почему объект values.total
за пределами рабочих возвращает 0, когда и пулы, и обратные вызовы возвращают фактическое значение шесть. Что здесь происходит?