многопроцессорные очереди очереди - PullRequest
0 голосов
/ 05 февраля 2020

Есть ли способ, с помощью которого я могу запустить несколько элементов из очереди в рабочий пул?

У меня есть производитель, заполняющий очередь элементами для обработки, и потребитель, работающий в фоновом режиме, ожидающий выполнения задач .

Я бы хотел, чтобы потребитель взял несколько заданий одновременно:

import multiprocessing as mp
import time

def producer(queue):
    print "produce"
    for i in range(0,100,5):
        for j in range(i,i+5,1):
            queue.put(i)
            print "Producer put: %s" % i
        time.sleep(1)
    queue.put(None)


def consumer2(queue):
    my_pool = mp.Pool(5)
    init_size = queue.qsize()
    for i in range(0, init_size, 5):
        items = [queue.get() for k in range(0,4)]
        my_pool.map(test_worker, items)
    my_pool.close()
    my_pool.join()

def test_worker(inp):
    print "Received %s as input" % inp
    time.sleep(0.4)
    return

if __name__ == '__main__':
    pool = mp.Pool(3)
    data_q = mp.JoinableQueue()
    p = pool.Process(target=consumer2, args=(data_q, ))
    c = pool.Process(target=producer, args=(data_q, ))
    c.start()
    p.start()
    p.join()
    c.join()

Вот вывод, который я наблюдаю:

produce
Producer put: 0
Producer put: 0
Producer put: 0
Producer put: 0
Producer put: 0
Received 0 as input
Received 0 as input
Received 0 as input
Received 0 as input
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 5
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 10
Producer put: 15
Producer put: 15
Producer put: 15
Producer put: 15
Producer put: 15
...

Кажется, что потребитель завершает работу после обработки первых 5 задач. Я бы хотел, чтобы потребитель проверял очередь после выполнения пакета задач вместо выхода.

...