Python закрывает несколько потоков - PullRequest
0 голосов
/ 11 января 2012

Я использую приведенный ниже пример с веб-сайта IBM.Я заметил, что потоки для DatamineThread () и ThreadUrl () остаются открытыми из-за циклов while.

Я пытаюсь завершить эти потоки и напечатать текст, сообщающий мне об этом.Я не уверен, правильно ли я поступаю, или даже если потоки должны быть завершены таким образом.Проблема в том, что когда я устанавливаю run = False в main (), циклы while читают run = True.

Любая помощь была бы отличной ... Спасибо

import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()
run = True

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def run(self):
        global run
        while run:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

        print 'ThreadUrl finished...'


class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        global run
        while run:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

        print 'DatamineThread finished...'

start = time.time()
def main():
    global run
    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

    # try and break while-loops in threads
    run = False

    time.sleep(5)


main()
print "Elapsed Time: %s" % (time.time() - start)

1 Ответ

4 голосов
/ 11 января 2012

Лично я не большой поклонник глобальных переменных для условий потоков, во многом потому, что видел то, с чем вы сталкивались раньше.Причина кроется в документации по Python для Queue.get.

Если необязательный блок args имеет значение true, а время ожидания равно None (по умолчанию), блокируйте, если необходимо, до тех пор, пока элемент не станет доступен.

По сути, вы никогда не увидитевторая проверка по while run:, потому что out_queue.get() заблокировалась на неопределенный срок после опустошения очереди.

Лучший способ сделать это, ИМХО, - это использовать значения часового в очереди или использовать get_nowait и catchисключение для разрыва цикла.Примеры:

Sentinels

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            data = self.out_queue.get()
            if data == "time to quit": break
            # non-sentinel processing here.

Попробуйте / исключите

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            try:
                data = self.out_queue.get_nowait() # also, out_queue.get(False)
            except Queue.Empty: break
            # data processing here.

Чтобы убедиться, что все ваши потоки заканчиваются, можете сделать это несколькими способами:

Добавить Стражей для каждого работника

for i in range(numWorkers):
  out_queue.put('time to quit')

out_queue.join()

Заменить Стража

class DatamineThread(threading.Thread):
    def run(self):
        while True:
            data = self.out_queue.get()
            if data == "time to quit": 
                self.out_queue.put('time to quit')
                break
            # non-sentinel processing here.

Любой путь должен работать.Что предпочтительнее, зависит от того, как заполнен out_queue.Если он может быть добавлен / удален рабочими потоками, первый подход предпочтительнее.Позвоните join(), затем добавьте часовых, затем снова наберите join().Второй подход хорош, если вы не хотите помнить, сколько рабочих потоков вы создали - он использует только одно значение часового и не загромождает очередь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...