Позвольте мне ответить на мой собственный вопрос.Ниже приведены некоторые из моих пониманий:
а) apply_async()
возвращается немедленно.Я использую multiprocessing.Manager()
при создании Queue
, Value
и Array
, чтобы избежать ошибки Synchronized objects should only be shared between processes through inheritance
или xxx objects should only be shared between processes through inheritance
b) Использоватьmultiprocessing.Queue
для сигнализации, остановки, завершения рабочих процессов из родительского процесса.
c) Невозможно передавать разные сообщения для разных рабочих процессов, ожидающих в одной и той же очереди.Вместо этого используйте разные очереди.
d) Pool.apply_async()
позволяет только основной функции входа для рабочего процесса принимать один аргумент.В этом случае поместите аргументы в список ([]
e) Мы могли бы использовать multiprocessing.sharedctypes.RawValue()
, multiprocessing.sharedctypes.RawArray()
, multiprocessing.sharedctypes.Value()
и Array
для создания значения ctypes,массив ctypes, значение ctypes с необязательной блокировкой и массив ctypes с необязательными блокировками в общей памяти.Совместно используемые объекты могут передаваться рабочим процессам через аргументы ключевых слов initializer
и initargs
при создании объекта Pool
с использованием multiprocessing.Pool()
.Эти совместно используемые объекты не могут быть переданы с использованием методов Pool.apply_async()
или Pool.map()
f) Стандартная документация Python для многопроцессорной обработки нуждается в обновлении.Например,
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
должно быть записано как class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])
import multiprocessing as mp
import time
# Worker process 1
def f1(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f1: I do not like {x}!')
elif x == -1:
print(f'f1: Quit')
return "f1"
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
# Worker process 2
def f2(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f2: I do not like {x}!')
elif x == -1:
print(f'f2: Quit')
return "f2"
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
def pInit(q, poolstr):
Initialize global shared variables among processes.
Could possibly share queue and lock here
global queue
queue = q # Point to the global queue in each process
print(f'{poolstr} is initialized')
def succCB(result):
print(f'Success returns = {result}')
def failCB(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
# Create shared memory to pass data to worker processes
# lock=True for multiple worker processes on the same queue
v1 = mp.Manager().Value('i', 0, lock=True)
a1 = mp.Manager().Array('i', range(20), lock=True)
# lock=False for 1 worker process on the queue
v2 = mp.Manager().Value('i', 0, lock=False)
a2 = mp.Manager().Array('i', range(20), lock=False)
# Create queues for signaling worker processes
queue1 = mp.Manager().Queue()
queue2 = mp.Manager().Queue()
# Creating pool of processes now - fork here
pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))
# Assign entry function for each pool
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)
# Parent process, worker processes do not see this anymore
# Parent process notifies the worker processes
for x in range(20):
a1[x] = x
a2[x] = x+10
v1.value = 2
v2.value = 18
# Parant processes terminate or quit the worker processes
queue1.put(-1) # Quit properly
queue1.put(20) # Raise exception
queue2.put(-1) # Quit properly
Выходные данные
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!