Я пытаюсь реализовать пул потоков, который обрабатывает очередь задач, используя ThreadPool
и Queue
. Он начинается с начальной очереди задач, а затем каждая из задач может также помещать дополнительные задачи в очередь задач. Проблема в том, что я не знаю, как блокировать, пока очередь не станет пустой и пул потоков не завершит обработку, но все равно проверю очередь и отправлю любые новые задачи в пул потоков, которые были помещены в очередь. Я не могу просто позвонить ThreadPool.join()
, потому что мне нужно держать пул открытым для новых задач.
Например:
from multiprocessing.pool import ThreadPool
from Queue import Queue
from random import random
import time
import threading
queue = Queue()
pool = ThreadPool()
stdout_lock = threading.Lock()
def foobar_task():
with stdout_lock: print "task called"
if random() > .25:
with stdout_lock: print "task appended to queue"
queue.append(foobar_task)
time.sleep(1)
# set up initial queue
for n in range(5):
queue.put(foobar_task)
# run the thread pool
while not queue.empty():
task = queue.get()
pool.apply_async(task)
with stdout_lock: print "pool is closed"
pool.close()
pool.join()
Это выводит:
pool is closed
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
task called
task appended to queue
Это выходит из цикла while до того, как foobar_tasks добавляются в очередь, поэтому добавленные задачи никогда не передаются в пул потоков. Я не могу найти способ определить, есть ли в пуле потоков все еще активные рабочие потоки. Я попробовал следующее:
while not queue.empty() or any(worker.is_alive() for worker in pool._pool):
if not queue.empty():
task = queue.get()
pool.apply_async(task)
else:
with stdout_lock: print "waiting for worker threads to complete..."
time.sleep(1)
Но, похоже, worker.is_alive()
всегда возвращает true, поэтому это входит в бесконечный цикл.
Есть ли лучший способ сделать это?