Есть ли способ, с помощью которого я могу запустить несколько элементов из очереди в рабочий пул?
У меня есть производитель, заполняющий очередь элементами для обработки, и потребитель, работающий в фоновом режиме, ожидающий выполнения задач .
Я бы хотел, чтобы потребитель взял несколько заданий одновременно:
import multiprocessing as mp
import time
def producer(queue):
print "produce"
for i in range(0,100,5):
for j in range(i,i+5,1):
queue.put(i)
print "Producer put: %s" % i
time.sleep(1)
queue.put(None)
def consumer2(queue):
my_pool = mp.Pool(5)
init_size = queue.qsize()
for i in range(0, init_size, 5):
items = [queue.get() for k in range(0,4)]
my_pool.map(test_worker, items)
my_pool.close()
my_pool.join()
def test_worker(inp):
print "Received %s as input" % inp
time.sleep(0.4)
return
if __name__ == '__main__':
pool = mp.Pool(3)
data_q = mp.JoinableQueue()
p = pool.Process(target=consumer2, args=(data_q, ))
c = pool.Process(target=producer, args=(data_q, ))
c.start()
p.start()
p.join()
c.join()
Вот вывод, который я наблюдаю:
produce
Producer put: 0
Producer put: 0
Producer put: 0
Producer put: 0
Producer put: 0
Received 0 as input
Received 0 as input
Received 0 as input
Received 0 as input
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 15
Producer put: 15
Producer put: 15
Producer put: 15
Producer put: 15
...
Кажется, что потребитель завершает работу после обработки первых 5 задач. Я бы хотел, чтобы потребитель проверял очередь после выполнения пакета задач вместо выхода.