Определение, когда ThreadPool завершил обработку очереди - PullRequest
3 голосов
/ 28 октября 2011

Я пытаюсь реализовать пул потоков, который обрабатывает очередь задач, используя ThreadPool и Queue. Он начинается с начальной очереди задач, а затем каждая из задач может также помещать дополнительные задачи в очередь задач. Проблема в том, что я не знаю, как блокировать, пока очередь не станет пустой и пул потоков не завершит обработку, но все равно проверю очередь и отправлю любые новые задачи в пул потоков, которые были помещены в очередь. Я не могу просто позвонить ThreadPool.join(), потому что мне нужно держать пул открытым для новых задач.

Например:

from multiprocessing.pool import ThreadPool
from Queue import Queue
from random import random
import time
import threading

queue = Queue()
pool = ThreadPool()
stdout_lock = threading.Lock()

def foobar_task():
    with stdout_lock: print "task called" 
    if random() > .25:
        with stdout_lock: print "task appended to queue"
        queue.append(foobar_task)
    time.sleep(1)

# set up initial queue
for n in range(5):
    queue.put(foobar_task)

# run the thread pool
while not queue.empty():
    task = queue.get() 
    pool.apply_async(task)

with stdout_lock: print "pool is closed"
pool.close()
pool.join()

Это выводит:

pool is closed
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue

Это выходит из цикла while до того, как foobar_tasks добавляются в очередь, поэтому добавленные задачи никогда не передаются в пул потоков. Я не могу найти способ определить, есть ли в пуле потоков все еще активные рабочие потоки. Я попробовал следующее:

while not queue.empty() or any(worker.is_alive() for worker in pool._pool):
    if not queue.empty():
        task = queue.get() 
        pool.apply_async(task)
    else:   
        with stdout_lock: print "waiting for worker threads to complete..."
        time.sleep(1)

Но, похоже, worker.is_alive() всегда возвращает true, поэтому это входит в бесконечный цикл.

Есть ли лучший способ сделать это?

1 Ответ

2 голосов
/ 28 октября 2011
  1. Звоните queue.task_done после обработки каждой задачи.
  2. Затем вы можете вызвать queue.join () , чтобы заблокировать основной поток, пока все задачи были выполнены.
  3. Чтобы завершить рабочие потоки, поместите дозорного (например, None) в очередь, и foobar_task вырвется из while-loop, когда получит стража.
  4. Я думаю, что это проще реализовать с threading.Thread s, чем с ThreadPool.

import random
import time
import threading
import logging
import Queue

logger=logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

sentinel=None
queue = Queue.Queue()
num_threads = 5

def foobar_task(queue):
    while True:
        n = queue.get()
        logger.info('task called: {n}'.format(n=n))
        if n is sentinel: break
        n=random.random()
        if n > .25:
            logger.info("task appended to queue")
            queue.put(n)
        queue.task_done()

# set up initial queue
for i in range(num_threads):
    queue.put(i)

threads=[threading.Thread(target=foobar_task,args=(queue,))
         for n in range(num_threads)]
for t in threads:
    t.start()

queue.join()
for i in range(num_threads):
    queue.put(sentinel)

for t in threads:
    t.join()
logger.info("threads are closed")
...