python: самостоятельно управляемый класс потоков, основанный на глубине очереди - PullRequest
2 голосов
/ 25 ноября 2011

пожалуйста, рассмотрите следующий пример кода:

class DbWorker(Thread):
    def __init__(self, dbws, dbwa, dbwm, dbq, dbqx, logr):
        self.__dbws = dbws      # db worker semaphore 
        self.__dbwa = dbwa      # db workers active counter
        self.__dbwm = dbwm      # db workers max active 
        self.__dbQ  = dbq       # db queue 
        self.__dbqx = dbqx      # db query execution counter
        #self.__rq  = rq        # response queue 
        self.__logr = logr      # logger

        self.__lock = threading.Lock()
        self.__id   = "DB_"+str(time.time())

        try: 
            self.__dbConn = mysql.MySQL(DBHOST, DBUSER, DBPASS, DBNAME, self.__id)

        except MySQLdb, e: 
            self.__msg = 'EXCEPTION: %s - %d: %s' % (str(time.ctime()), e.args[0], e.args[1])
            self.__lorg.write ("DEBUG", "mysql", self.__msg)

        """ Acquire semaphore """
        self.__dbws.acquire()

            """ Acquire semaphore """
        self.__lock.acquire()
        Thread.__init__(self, name = self.__id)


    def run(self):
        while 1:

            self.__qi = self.__dbQ.get()
            #print "ITEM: " + str(self.__qi)
            # start up new thread if q not empty and active workers limit not reached  

            if (self.__dbwa.status() < self.__dbwm ) and (self.__dbQ.qsize() > self.__dbwa.status() - 1):
                DbWorker(self.__dbws, self.__dbwa, self.__dbwm, self.__dbQ, self.__dbqx, self.__logr).start() 
                self.__dbwa.increase()


            if self.__qi == None:

                #self._dbwa = global DBWactive
                if self.__dbwa.status() > 1:
                    self.__dbwa.decrease()
                    self.__dbConn.closeConn()
                    self.__dbws.release()
                    self.__lock.release()
                    self.__dbQ.task_done()
                    break

                else:
                    self.__dbQ.task_done()
                    time.sleep(1/2)

            # just execute query
            elif self.__qi[0] == "exec":
                self.__logr.write("DEBUG", "SQLQR", self.__qi[1])
                self.__qry = self.__qi[1]

                try:
                    self.__result = self.__dbConn.insert(self.__qry)

                except MySQLdb, e:
                    self.__msg = 'ERROR: %s %s \n %d: %s' % (str(self.__id), str(self.__qry), e.args[0], eargs[1])
                    self.__logr.write("DEBUG", "dbworker", self.__msg)

                self.__dbQ.task_done()
                self.__dbqx.increase()


            if (self.__dbQ.qsize() + 1) < self.__dbwa.status():
                    while self.__dbwa.status() > self.__dbQ.qsize()+1:
                        self.__dbQ.put(None)

Я бы хотел узнать ваше мнение по поводу:

это хороший подход к управлению потоками? как я вижу, основной процесс время от времени зависает, и мне нужно его перезапустить.

Можете ли вы предвидеть какие-либо проблемы, когда один поток остается активным для управления очередью и запускаете несколько потоков, если это необходимо?

почему я вижу, что основной процесс завис, но нет никаких исключений из какого-либо запущенного потока или основного процесса?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...