Проблема в вашем примере заключается в том, что изменения стандартных изменяемых структур в Manager.dict
распространяться не будут. Сначала я покажу вам, как это исправить с помощью менеджера, чтобы потом показать вам лучшие варианты.
multiprocessing.Manager
немного тяжел, поскольку он использует отдельный процесс только для Manager
, а для работы с общим объектом необходимо использовать блокировки для согласованности данных. Если вы запускаете это на одной машине, есть лучшие варианты с multiprocessing.Pool
, в случае, если вам не нужно запускать настроенные Process
классы, и если вам нужно, multiprocessing.Process
вместе с multiprocessing.Queue
будет обычным способом делать это.
Цитируемые части из многопроцессорной документы.
Менеджер
Если стандартный (не-прокси) список или объекты dict содержатся в референте, изменения в этих изменяемых значениях не будут распространяться через диспетчер, поскольку прокси-сервер не может знать, когда изменяются содержащиеся в нем значения. Однако сохранение значения в прокси-контейнере (которое вызывает setitem на прокси-объекте) распространяется через менеджер, и поэтому для эффективного изменения такого элемента можно было бы переназначить измененное значение контейнеру. прокси ...
В вашем случае это будет выглядеть так:
def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y
Здесь lock
будет экземпляром manager.Lock
, который вы должны передать в качестве аргумента, поскольку вся (теперь) заблокированная операция сама по себе не является атомарной. ( Здесь
более простой пример с Manager
с использованием Lock)
Этот подход, возможно, менее удобен, чем использование вложенных прокси-объектов для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.
Поскольку прокси-объекты Python 3.6 являются вложенными:
Изменено в версии 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые все будут управляться и синхронизироваться SyncManager.
Начиная с Python 3.6, вы можете заполнить manager.dict
перед началом многопроцессорной обработки с manager.list
в качестве значений, а затем добавить непосредственно в рабочий без переназначения.
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
EDIT:
Вот полный пример с Manager
:
import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges
# def context_timer ... see code snippet in "Pool" section below
def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]
with context_timer():
pool = [Process(target=worker, args=args)
for args in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
# Create standard container with data from manager before exiting
# the manager.
result = {k: list(v) for k, v in return_dict.items()}
print(result)
Бассейн
Чаще всего multiprocessing.Pool
просто сделает это. У вас есть дополнительная проблема в вашем примере, так как вы хотите распределить итерации по диапазону.
Ваша chunker
функция не может разделить диапазон, даже если каждый процесс выполняет одну и ту же работу:
chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!
Для приведенного ниже кода, пожалуйста, возьмите фрагмент кода для mp_utils.py
из моего ответа здесь , он обеспечивает две функции для чанковских диапазонов, насколько это возможно.
С помощью multiprocessing.Pool
ваша worker
функция просто должна вернуть результат, а Pool
позаботится о передаче результата обратно по внутренним очередям обратно в родительский процесс. result
будет списком, поэтому вам придется снова изменить порядок результатов так, как вы хотите. Ваш пример может выглядеть так:
import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain
from mp_utils import calc_batch_sizes, build_batch_ranges
@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
print(f'Total time: {total_time:.4f} s\n')
def worker(batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')
Пример вывода:
[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 0.0000 s
Total time: 8.2408 s
results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)
Process finished with exit code 0
Если бы у вас было несколько аргументов для вашего worker
, вы бы создали список «задач» с аргументами-кортежами и обменялись pool.map(...)
с pool.starmap(...iterable=tasks)
. См. Документы для получения дополнительной информации об этом.
Обработка и очередь
Если по какой-то причине вы не можете использовать multiprocessing.Pool
, вы должны принять
позаботьтесь о межпроцессном взаимодействии (МПК) самостоятельно, передав
multiprocessing.Queue
в качестве аргумента для ваших рабочих функций в child-
процессы и позволяя им ставить в очередь свои результаты для отправки обратно в
Родитель.
Вам также нужно будет построить свою структуру, похожую на пул, чтобы вы могли перебирать ее, чтобы запускать и присоединяться к процессам, и вам нужно get()
вернуть результаты из очереди. Подробнее о Queue.get
использовании я написал здесь .
Решение с таким подходом может выглядеть следующим образом:
def worker(result_queue, batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]
for p in pool:
p.start()
results = [result_queue.get() for _ in batch_ranges]
for p in pool:
p.join()
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')