Ошибка многопроцессорной отладки - PullRequest
1 голос
/ 02 августа 2009

Привет всем, у меня небольшие проблемы с отладкой моего кода. Пожалуйста, смотрите ниже:

import globalFunc
from globalFunc import systemPrint
from globalFunc import out
from globalFunc import debug
import math
import time
import multiprocessing

"""
Somehow this is not working well
"""
class urlServerM( multiprocessing.Process):

    """
    This calculates how much links get put into the priority queue
    so to reach the level that we intend, for every query resultset,
    we will put the a certain number of links into visitNext first,
    and even if every resultSet is full, we will be able to achieve the link
    level that we intended. The rest is pushed into another list where
    if the first set of lists don't have max for every time, the remaining will
    be spared on these links
    """
    def getPriorityCounter(self, level, constraint):
                return int( math.exp( ( math.log(constraint) / (level - 1) ) ) )


    def __init__( self, level, constraint, urlQ):
        """limit is obtained via ngCrawler.getPriorityNum"""
        multiprocessing.Process.__init__(self)
        self.constraint = int( constraint)
        self.limit = self.getPriorityCounter( level, self.constraint)
        self.visitNext = []
        self.visitLater = []
        self._count = 0
        self.urlQ = urlQ


    """
    puts the next into the Queue
    """
    def putNextIntoQ(self):
        debug('putNextIntoQ', str(self.visitNext) + str(self.visitLater) )
        if self.visitNext != []:
            _tmp = self.visitNext[0]
            self.visitNext.remove(_tmp)
            self.urlQ.put(_tmp)

        elif self.visitLater != []:
            _tmp = self.visitLater[0]
            self.visitLater.remove(_tmp)
            self.urlQ.put(_tmp)


    def run(self):
        while True:
            if self.hasNext():
                time.sleep(0.5)
                self.putNextIntoQ()
                debug('process', 'put something in Q already')
            else:
                out('process', 'Nothing in visitNext or visitLater, sleeping')
                time.sleep(2)
        return


    def hasNext(self):
        debug( 'hasnext', str(self.visitNext) + str(self.visitLater) )
        if self.visitNext != []:
            return True
        elif self.visitLater != []:
            return True
        return False


    """
    This function resets the counter 
    which is used to keep track of how much is already inside the 
    visitNext vs visitLater
    """
    def reset(self): 
        self._count = 0


    def store(self, linkS):
        """Stores a link into one of these list"""
        if self._count < self.limit:
            self.visitNext.append( linkS)
            debug('put', 'something is put inside visitNext')
        else: 
            self.visitLater.append( linkS)
            debug('put', 'something is put inside visitLater')
        self._count += 1



if __name__ == "__main__":
    #   def __init__( self, level, constraint, urlQ):

    from multiprocessing import Queue
    q = Queue(3)
    us = urlServerM( 3, 6000, q)

    us.start()
    time.sleep(2)

    # only one thread will do this
    us.store('http://www.google.com')
    debug('put', 'put completed')

    time.sleep(3)

    print q.get_nowait()
    time.sleep(3)

И это вывод

OUTPUT
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
DEBUG put: something is put inside visitNext
DEBUG put: put completed
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
DEBUG hasnext: [][]
[process]   Nothing in visitNext or visitLater, sleeping
Traceback (most recent call last):
  File "urlServerM.py", line 112, in <module>
    print q.get_nowait()
  File "/usr/lib/python2.6/multiprocessing/queues.py", line 122, in get_nowait
    return self.get(False)
  File "/usr/lib/python2.6/multiprocessing/queues.py", line 104, in get
    raise Empty
Queue.Empty
DEBUG hasnext: [][]

Очевидно, я нахожу это действительно странным. В общем, что это за код, что при тестировании в main () он запускает процесс, а затем сохраняет http://www.google.com в классе visitNext, а затем я просто хочу увидеть, как его помещают в очередь.

Однако согласно выводу Я нахожу крайне странным, что, хотя мой класс завершил сохранение URL-адреса в классе, hasNext ничего не показывает. Любое тело знает почему? Это лучший способ написать run () в непрерывном цикле while? Это действительно необходимо? Я в основном пытаюсь поэкспериментировать с модулем многопроцессорности, и у меня есть пул рабочих (из multiprocessing.Pool), которым необходимо получить эти URL из этого класса (единая точка входа). Является ли использование очереди лучшим способом? Нужно ли мне делать это «живым» процессом, поскольку каждый работник просит об этом из очереди, и если у меня нет способа сообщить об этом моему urlServer, чтобы что-то поместить в очередь, я не могу придумать менее трудный способ.

1 Ответ

0 голосов
/ 03 августа 2009

Вы используете многопроцессорность, поэтому память не распределяется между основным исполнением и вашим urlserver.

т.е. Я думаю, что это фактически Noop: {us.store('http://www.google.com')}, потому что когда он выполняется в основном потоке, он изменяет только представление основных потоков {us}. Вы можете подтвердить, что URL находится в памяти основного потока, вызвав {us.hasnext()} до {q.get_nowait()}.

Чтобы это работало, вам нужно превратить все списки, которыми вы хотите поделиться, в Queue-s или Pipe-s. В качестве альтернативы вы можете просто изменить свою модель на {threading}, и она должна работать без изменений (более или менее - вам придется блокировать списки посещений, и у вас снова возникают проблемы с GIL).

(И да - пожалуйста, отредактируйте ваши вопросы лучше в следующий раз. Я знал, в чем может быть ваша проблема, как только увидел "многопроцессорность", но в противном случае я бы вообще не стал смотреть на код ...)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...