Python3 ThreadPoolExecutor застрял после того, как очередь пуста - PullRequest
0 голосов
/ 08 февраля 2019

Я определил ThreadPoolExecutor с определенным размером очереди (блокировка)
После выполнения задач он не завершается, так как застревает при некоторых условиях.

from concurrent.futures import ThreadPoolExecutor
import queue


class ThreadPoolExecutorWithQueueSizeLimit(ThreadPoolExecutor):
    ''' 
    Sub-classing ThreadPoolExecutor to implement
    blocking queue feature, not currently available.
    We used queue.Queue module here, which is blocking in nature when maxsize != 0
    Reference: https://stackoverflow.com/questions/48263704/threadpoolexecutor-how-to-limit-the-queue-maxsize
    '''
    def __init__(self, maxsize=0, *args, **kwargs):
        super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(maxsize=maxsize)

exe = ThreadPoolExecutorWithQueueSizeLimit(maxsize=2, max_workers=3)

l = []

def f(args):
    a = args[0]
    import time
    time.sleep(2)
    b = args[1]
    print ((a, b), exe._work_queue.qsize())
    return (a, b)
for i in range(5):
    for j in range(4):
        l.append((j, j+1))
    print (list(exe.map(f, l, timeout=1)))
print (exe._work_queue.qsize())
exe.shutdown()
print ("done")

После того, как я нажму команду + C (mac), она выдаст следующую трассировку (вместе с выводом):

Traceback (most recent call last):
  File "test.py", line 30, in <module>
    print (list(exe.map(f, l, timeout=1)))
  File "/export/apps/python/3.6/lib/python3.6/concurrent/futures/_base.py", line 558, in result_iterator
    yield future.result(end_time - time.time())
  File "/export/apps/python/3.6/lib/python3.6/concurrent/futures/_base.py", line 407, in result
    raise TimeoutError()
concurrent.futures._base.TimeoutError
(0, 1) 2
(2, 3) 2
(1, 2) 2
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/export/apps/python/3.6/lib/python3.6/queue.py", line 133, in put
    self.not_full.wait()
  File "/export/apps/python/3.6/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

Итак, я хочу знать, почему программа зависает на acqu () ичто нужно сделать, чтобы освободить его, поскольку размер очереди также равен 0 в этой точке.
Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...