Это может показывать мое упрямство, но я всегда считал, что такие проекты, как сельдерей, которые добавляют сложность поверх многопроцессорной обработки (которая уже сложна), доставляют больше хлопот, чем они того стоят. Также нет лучшей альтернативы imo использованию совместно используемой памяти stdlib и мьютексов с точки зрения скорости и простоты.
В вашем случае простым решением было бы просто использовать очередь fifo для каждого процесса и поместить в каждый кадр от производителя. Это, естественно, привело бы к большому потреблению памяти, если бы вы делали n копий каждого кадра для n потребителей, однако вы, вероятно, могли бы довольно легко придумать механизм для помещения самих кадров в multiprocessing.sharedctypes.Array
и пропускания только индексов через вместо очереди Пока очереди ограничены по длине короче, чем длина буфера, вы должны быть ограничены от перезаписи кадра в буфере, пока он не будет использован всеми потребителями. Без какой-либо синхронизации, это полетело бы у вас под штанами, но немного магии мьютекса определенно могло бы сделать это очень надежным решением.
например:
import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce
BUFSHAPE = (10,10,10) #10 10x10 images in buffer
class Worker(Process):
def __init__(self, q_size, buffer, name=''):
super().__init__()
self.queue = Queue(q_size)
self.buffer = buffer
self.name = name
def run(self,): #do work here
#I hardcoded datatype here. you might need to communicate it to the child process
buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
while True:
item = self.queue.get()
if item == 'done':
print('child process: {} completed all frames'.format(self.name))
return
with self.buffer.get_lock(): #prevent writing while we're reading
#slice the frame from the array uning the index that was sent
frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
#do some intense processing on `frame`
sleep(np.random.rand())
print('child process: {} completed frame: {}'.format(self.name, item))
def main():
#creating shared array
buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
#make a numpy.array using that memory location to make it easy to stuff data into it
buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
#create a list of workers
workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
buffer, #pass in shared buffer array
str(i)) #numbered child processes
for i in range(5)] #5 workers
for worker in workers: #start the workers
worker.start()
for i in range(100): #generate 100 random frames to send to workers
#insert a frame into the buffer
with buffer.get_lock(): #prevent reading while we're writing
buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
#send the frame number to each worker for processing. If the input queue is full, this will block until there's space
# this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
for worker in workers:
worker.queue.put(i)
#when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
for worker in workers:
worker.queue.put('done')
worker.join()
if __name__ == "__main__":
freeze_support()
main()
РЕДАКТИРОВАТЬ
Для некоторой ошибки типа «один за другим» требуется, чтобы очередь была на 2 кадра меньше буфера, а не на 1 кадр меньше, чтобы предотвратить перезапись кадра раньше времени.
EDIT2 - пояснение первого редактирования:
Причина len(q) = len(buf)-2
заключается в том, что q.get()
вызывается до того, как мы получим кадр из буфера, и сам кадр записывается перед тем, как мы пытаемся отправить индекс в очередь. Если разница в длине составляет всего 1, работник может извлечь индекс очереди из очереди, тогда производитель может увидеть, что он может перейти в очередь сейчас и перейти к следующему кадру, прежде чем работник сможет прочитать кадр. сам. Есть много способов, которыми вы могли бы подойти к этому по-другому, что могло бы позволить меньшему количеству процессов, ожидающих друг друга все время, возможно, используя mp.Event
.