Лично я не большой поклонник глобальных переменных для условий потоков, во многом потому, что видел то, с чем вы сталкивались раньше.Причина кроется в документации по Python для Queue.get.
Если необязательный блок args имеет значение true, а время ожидания равно None (по умолчанию), блокируйте, если необходимо, до тех пор, пока элемент не станет доступен.
По сути, вы никогда не увидитевторая проверка по while run:
, потому что out_queue.get()
заблокировалась на неопределенный срок после опустошения очереди.
Лучший способ сделать это, ИМХО, - это использовать значения часового в очереди или использовать get_nowait и catchисключение для разрыва цикла.Примеры:
Sentinels
class DatamineThread(threading.Thread):
def run(self):
while True:
data = self.out_queue.get()
if data == "time to quit": break
# non-sentinel processing here.
Попробуйте / исключите
class DatamineThread(threading.Thread):
def run(self):
while True:
try:
data = self.out_queue.get_nowait() # also, out_queue.get(False)
except Queue.Empty: break
# data processing here.
Чтобы убедиться, что все ваши потоки заканчиваются, можете сделать это несколькими способами:
Добавить Стражей для каждого работника
for i in range(numWorkers):
out_queue.put('time to quit')
out_queue.join()
Заменить Стража
class DatamineThread(threading.Thread):
def run(self):
while True:
data = self.out_queue.get()
if data == "time to quit":
self.out_queue.put('time to quit')
break
# non-sentinel processing here.
Любой путь должен работать.Что предпочтительнее, зависит от того, как заполнен out_queue.Если он может быть добавлен / удален рабочими потоками, первый подход предпочтительнее.Позвоните join()
, затем добавьте часовых, затем снова наберите join()
.Второй подход хорош, если вы не хотите помнить, сколько рабочих потоков вы создали - он использует только одно значение часового и не загромождает очередь.