Поток Python не останавливается, хотя демон имеет значение True - PullRequest
0 голосов
/ 24 ноября 2018

Я предполагаю, что это очевидно, но я делаю скрипт, который использует многопоточность для тестирования прокси, и должен остановиться, когда обнаружит определенное количество прокси.Когда я запускаю его, потоки перестают генерировать вывод, если условие выполнено, но программа не закрывается.Я смотрел на другие подобные проблемы, но, похоже, не смог успешно их реализовать.Буду признателен за любые указатели.

import queue
import threading

import time   
import urllib.request

class ThreadUrl(threading.Thread):

    def __init__(self, queue, working_proxies):
        threading.Thread.__init__(self)
        self.queue = queue
        self.working_proxies = working_proxies

    def run(self):
        while len(self.working_proxies)<5:

            proxy = self.queue.get()

            try:
                proxy_handler = urllib.request.ProxyHandler({'http': proxy})
                opener = urllib.request.build_opener(proxy_handler)
                opener.addheaders = [('User-agent',
                                      'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')]
                urllib.request.install_opener(opener)
                req = urllib.request.Request('http://www.wikipedia.org')
                sock=urllib.request.urlopen(req)
                print(f'{proxy} works')
                with appending_lock:
                    self.working_proxies.append(proxy)

            except urllib.request.HTTPError as e:
                print('Error code: ', e.code)

            except Exception as detail:
                print("ERROR:", detail)

            self.queue.task_done()


def main(proxies, working_proxies):
    for i in range(5):
        t = ThreadUrl(queue,working_proxies)
        t.daemon = True
        t.start()


        for proxy in proxies:
            queue.put(proxy)

    queue.join()


if __name__ == '__main__':
    start = time.time()

    appending_lock = threading.Lock()
    proxies = [...list of proxies...]
    working_proxies = []

    queue = queue.Queue()

    main(proxies, working_proxies)
    print("Elapsed Time: %s" % (time.time() - start))

Неправильно ли я использую атрибут daemon или есть другие параметры, которые я должен установить, чтобы обеспечить остановку потоков?

1 Ответ

0 голосов
/ 25 ноября 2018

После просмотра документов и нескольких других ресурсов я обнаружил, что потоки закрылись должным образом, но функция queue.join () блокировалась до тех пор, пока очередь не была очищена.Поскольку этого не произойдет, если потоки будут уничтожены до завершения всех элементов очереди, сценарий продолжит выполняться.

Итак, я переопределил queue.join (), чтобы он выглядел так:

queue = queue.Queue()

def waiter(queue):
    while not queue.empty() and dead == False:
        pass

queue.join = waiter

Переменная 'dead' доступна в потоке, если она устанавливает, что условие закрытия выполнено.В приведенном выше коде вместо «len (self.working_proxies) <5» в цикле while функции run () будет помещено «dead».Каждый раз, когда новый элемент добавляется к self.working_proxies, сценарий должен проверять, выполнено ли условие для завершения, и, если это так, установить для «dead» значение True. </p>

Кроме того, «dead» являетсяглобальная переменная, поэтому ее не нужно передавать в новую функцию waiter ().

Я уверен, что есть более элегантный способ решения проблемы, но пока что этот должен помочь..

...