критика этого кода Python (сканер с пулом потоков) - PullRequest
0 голосов
/ 06 сентября 2010

насколько хорош этот код на питоне?нужна критика) в этом коде есть ошибка, иногда скрипт выдает "ВСЕ ПОДОЖДИТЕ - МОЙ ФИНИШ!"и заморозить (больше никаких действий не происходит ..), но я не могу найти причину, почему это произошло?

сканер сайта с пулом потоков:

import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread

W_WAIT = 1
W_WORK = 0

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, pool, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()
        self.pool = pool
        self.state = None

    def is_wait(self):
        return self.state == W_WAIT


    def run(self):
        while True:
            #if all workers wait - time to exsit
            print "CHECK WAIT: !!! ",self.pool.is_all_wait()
            if self.pool.is_all_wait():
                print "ALL WAIT - CAN FINISH!"
                return
            try:
                func, args, kargs = self.tasks.get(timeout=3)
            except Empty:
                print "task wait timeout"
                continue

            self.state = W_WORK
            print "START !!! in thread %s" % str(self)
            #print args

            try: func(*args, **kargs)
            except Exception, e: print e
            print "!!! STOP in thread %s", str(self)
            self.tasks.task_done()
            self.state = W_WAIT
            #threads can fast empty it!
            #if self.tasks.qsize() == 0:
            #    print "QUIT!!!!!!"
            #    break

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        #self.tasks = Queue(num_threads)
        self.tasks = Queue()
        self.workers = []
        for _ in range(num_threads): 
            self.workers.append(Worker(self,self.tasks))


    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def is_all_wait(self):
        for w in self.workers:
            if not w.is_wait():
                return False
        return True

visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0

def process(pool,host,url):

    try:

        content = urlopen(url).read()
    except UnicodeDecodeError:
        return


    for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
        try:
            href = link['href']
        except KeyError:
            continue


        if not href.startswith('http://'):
            href = 'http://%s%s' % (host, href)
        if not href.startswith('http://%s%s' % (host, '/')):
            continue

        internal_links_set.add(href)


        if href not in visited:
            visited.add(href)
            pool.add_task(process,pool,host,href)

        else:
            pass

def start(host,charset):
    pool = ThreadPool(20)
    pool.add_task(process,pool,host,'http://%s/' % (host))
    pool.wait_completion()

start('evgenm.com','utf8') 

Спасибо за помощь!я делаю новую реализацию: что вы можете сказать об этом коде # 2?================================== ПОПРОБУЙТЕ 2 =======================================

    import sys
    from urllib import urlopen
    from BeautifulSoup import BeautifulSoup, SoupStrainer
    import re
    from Queue import Queue, Empty
    from threading import Thread


    W_STOP = 1

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, pool, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.pool = pool
        self.state = None
        self.start()



    def stop(self):
        self.state = W_STOP

    def run(self):
        while True:
            if self.state == W_STOP:
                print "\ncalled stop"
                break
            try:
                func, args, kargs = self.tasks.get(timeout=3)
            except Empty:
                continue
            print "\n***START*** %s" % str(self)
            try: 
                func(*args, **kargs)
            except Exception, e: 
                print e
            print "\n***STOP*** %s", str(self)
            self.tasks.task_done()



class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        #self.tasks = Queue(num_threads)
        self.tasks = Queue()
        self.workers = []
        for _ in range(num_threads): 
            self.workers.append(Worker(self,self.tasks))


    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def stop_threads(self):
        for w in self.workers:
            w.stop()

    def wait_stop(self):
        self.wait_completion()
        self.stop_threads()



    visited = set()
    queue = Queue()
    external_links_set = set()
    internal_links_set = set()
    external_links = 0

    def process(pool,host,url):

        try:

            content = urlopen(url).read()
        except UnicodeDecodeError:
            return


        for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
            try:
                href = link['href']
            except KeyError:
                continue


            if not href.startswith('http://'):
                href = 'http://%s%s' % (host, href)
            if not href.startswith('http://%s%s' % (host, '/')):
                continue

            internal_links_set.add(href)


            if href not in visited:
                visited.add(href)
                pool.add_task(process,pool,host,href)

            else:
                pass

    def start(host,charset):
        pool = ThreadPool(20)
        pool.add_task(process,pool,host,'http://%s/' % (host))
        pool.wait_stop()

    start('evgenm.com','utf8') 

Ответы [ 2 ]

1 голос
/ 06 сентября 2010

Вы разделяете состояние между потоками (т. Е. В is_all_wait) без синхронизации. Кроме того, тот факт, что все потоки «ожидают», не является надежным индикатором того, что очередь пуста (например, все они могут находиться в процессе получения задачи). Я подозреваю, что иногда потоки выходят до того, как очередь действительно пуста. Если это случается достаточно часто, у вас останутся задачи в очереди, но нет потоков для их выполнения. Так что queue.join() будет ждать вечно.

Моя рекомендация:

  1. Избавьтесь от is_all_wait - это не надежный показатель
  2. Избавиться от задачи state - это на самом деле не нужно
  3. Положитесь на queue.join, чтобы сообщить, когда все будет обработано

Если вам нужно уничтожить потоки (например, это часть большой, долго работающей программы), сделайте это после queue.join().

0 голосов
/ 06 сентября 2010

У меня есть базовые знания Python, но потоки в Python не бесполезны? Я видел множество статей, критикующих интерпретатор глобальной блокировки за это.

...