Позвольте мне ответить на мой собственный вопрос.Ниже приведены некоторые из моих пониманий:
а) apply_async()
возвращается немедленно.Я использую multiprocessing.Manager()
при создании Queue
, Value
и Array
, чтобы избежать ошибки Synchronized objects should only be shared between processes through inheritance
или xxx objects should only be shared between processes through inheritance
.
b) Использоватьmultiprocessing.Queue
для сигнализации, остановки, завершения рабочих процессов из родительского процесса.
c) Невозможно передавать разные сообщения для разных рабочих процессов, ожидающих в одной и той же очереди.Вместо этого используйте разные очереди.
d) Pool.apply_async()
позволяет только основной функции входа для рабочего процесса принимать один аргумент.В этом случае поместите аргументы в список ([]
).
e) Мы могли бы использовать multiprocessing.sharedctypes.RawValue()
, multiprocessing.sharedctypes.RawArray()
, multiprocessing.sharedctypes.Value()
и Array
multiprocessing.sharedctypes.Array()
для создания значения ctypes,массив ctypes, значение ctypes с необязательной блокировкой и массив ctypes с необязательными блокировками в общей памяти.Совместно используемые объекты могут передаваться рабочим процессам через аргументы ключевых слов initializer
и initargs
при создании объекта Pool
с использованием multiprocessing.Pool()
.Эти совместно используемые объекты не могут быть переданы с использованием методов Pool.apply_async()
или Pool.map()
.
f) Стандартная документация Python для многопроцессорной обработки нуждается в обновлении.Например,
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
должно быть записано как class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])
import multiprocessing as mp
import time
# Worker process 1
def f1(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f1: I do not like {x}!')
elif x == -1:
print(f'f1: Quit')
return "f1"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
# Worker process 2
def f2(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f2: I do not like {x}!')
elif x == -1:
print(f'f2: Quit')
return "f2"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
def pInit(q, poolstr):
'''
Initialize global shared variables among processes.
Could possibly share queue and lock here
'''
global queue
queue = q # Point to the global queue in each process
print(f'{poolstr} is initialized')
def succCB(result):
print(f'Success returns = {result}')
def failCB(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
# Create shared memory to pass data to worker processes
# lock=True for multiple worker processes on the same queue
v1 = mp.Manager().Value('i', 0, lock=True)
a1 = mp.Manager().Array('i', range(20), lock=True)
# lock=False for 1 worker process on the queue
v2 = mp.Manager().Value('i', 0, lock=False)
a2 = mp.Manager().Array('i', range(20), lock=False)
# Create queues for signaling worker processes
queue1 = mp.Manager().Queue()
queue2 = mp.Manager().Queue()
# Creating pool of processes now - fork here
pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))
# Assign entry function for each pool
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)
# Parent process, worker processes do not see this anymore
# Parent process notifies the worker processes
for x in range(20):
a1[x] = x
a2[x] = x+10
v1.value = 2
v2.value = 18
queue1.put(1)
queue1.put(1)
queue2.put(18)
# Parant processes terminate or quit the worker processes
queue1.put(-1) # Quit properly
queue1.put(20) # Raise exception
queue2.put(-1) # Quit properly
pool1.close()
pool2.close()
pool1.join()
pool2.join()
Выходные данные
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!