Поток Python: что мне не хватает? (task_done () вызывается слишком много раз) - PullRequest
3 голосов
/ 16 сентября 2011

Приношу свои извинения за длительный пост.Надеюсь, это даст достаточно контекста для решения.Я попытался создать служебную функцию, которая будет принимать любое количество старых classmethod s и помещать их в многопоточную очередь:

class QueuedCall(threading.Thread):

    def __init__(self, name, queue, fn, args, cb):
        threading.Thread.__init__(self)
        self.name = name

        self._cb = cb
        self._fn = fn
        self._queue = queue
        self._args = args

        self.daemon = True
        self.start()

    def run(self):
        r = self._fn(*self._args) if self._args is not None \
            else self._fn()

        if self._cb is not None:
            self._cb(self.name, r)

            self._queue.task_done()

Вот как выглядит мой вызывающий код (внутри класса)

data = {}
def __op_complete(name, r):
    data[name] = r

q = Queue.Queue()

socket.setdefaulttimeout(5)

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))
q.put(QueuedCall('so_answers', q, StackExchange.get_answers,
    ['api.stackoverflow.com', 534476, 5], __op_complete))
q.put(QueuedCall('so_user', q, StackExchange.get_user_info,
    ['api.stackoverflow.com', 534476], __op_complete))
q.put(QueuedCall('p_answers', q, StackExchange.get_answers,
    ['api.programmers.stackexchange.com', 23901, 5], __op_complete))
q.put(QueuedCall('p_user', q, StackExchange.get_user_info,
    ['api.programmers.stackexchange.com', 23901], __op_complete))
q.put(QueuedCall('fb_image', q, Facebook.get_latest_picture, None, __op_complete))

q.join()
return data

Проблема, с которой я здесь сталкиваюсь, заключается в том, что она, похоже, работает каждый раз при новом перезапуске сервера, но завершается ошибкой каждый второй или третий запрос с ошибкой:

ValueError: task_done() called too many times

Эта ошибка проявляется в случайном потоке каждую секунду или третий запрос, поэтому довольно трудно определить точно в чем проблема.

У кого-нибудь есть идеи и / или предложения?

Спасибо.


Редактировать:

Я добавил print s в попытке отладить это (быстро и грязно, а не ведение журнала).Один оператор печати (print 'running thread: %s' % self.name) в первой строке run, а другой прямо перед вызовом task_done() (print 'thread done: %s' % self.name).

Вывод успешного запроса:

running thread: twitter
running thread: so_answers
running thread: so_user
running thread: p_answers
thread done: twitter
thread done: so_user
running thread: p_user
thread done: so_answers
running thread: fb_image
thread done: p_answers
thread done: p_user
thread done: fb_image

Вывод неудачного запроса:

running thread: twitter
running thread: so_answers
thread done: twitter
thread done: so_answers
running thread: so_user
thread done: so_user
running thread: p_answers
thread done: p_answers
Exception in thread p_answers:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/demian/src/www/projects/demianbrecht/demianbrecht/demianbrecht/helpers.py", line 37, in run
    self._queue.task_done()
  File "/usr/lib/python2.7/Queue.py", line 64, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

running thread: p_user
thread done: p_user
running thread: fb_image
thread done: fb_image

Ответы [ 2 ]

6 голосов
/ 16 сентября 2011

Ваш подход к этой проблеме "нетрадиционный".Но игнорируя это пока ... проблема в том, что в коде, который вы дали

q.put(QueuedCall('twitter', q, Twitter.get_status, [5,], __op_complete))

, вполне возможно, что следующий рабочий процесс произойдет

  1. Потокпостроен и запущен QueuedCall .__ init __
  2. Затем он помещается в очередь q.Однако ... прежде чем очередь завершит свою логику для вставки элемента, независимый поток уже завершил свою работу и попытался вызвать q.task_done ().Что вызывает у вас ошибку (task_done () был вызван до того, как объект был благополучно помещен в очередь)

Как это должно быть сделано?Вы не вставляете потоки в очереди.Очереди содержат данные, которые обрабатывают потоки.Поэтому вместо этого вы

  • создаете очередь.Вставьте в него нужные вам задания (например, функции, аргументы и обратный вызов)
  • Вы создаете и запускаете рабочие потоки
  • Рабочий поток вызывает
    • q.get () чтобы получить функцию для вызова
    • вызывает ее
    • вызывает q.task_done (), чтобы сообщить очереди, что элемент был обработан.
2 голосов
/ 16 сентября 2011

Возможно, я здесь неправильно понимаю, но я не уверен, что вы правильно используете Queue.

Из краткого обзора документов видно, что идея заключается в том, что вы можете использовать метод put, чтобы поместить работу в Queue, тогда другой поток может вызвать get, чтобы получить некоторую работу. , сделайте работу, а затем позвоните task_done когда она закончится.

Похоже, что ваш код делает put экземпляров QueuedCall в очередь. Ничто никогда не get из очереди, но экземплярам QueuedCall также передается ссылка на очередь, в которую они вставляются, и они выполняют свою работу (о которой они знают по сути, а не потому, что они get это из очереди), а затем позвоните task_done.

Если мое прочтение всего правильного (и вы не вызываете метод get из другого места, которого я не вижу), то я полагаю, что понимаю проблему.

Проблема в том, что экземпляры QueuedCall должны быть созданы до того, как они могут быть помещены в очередь, и процесс их создания начинает свою работу в другом потоке. Если поток завершает свою работу и вызывает task_done до , основной поток сумел put QueuedCall в очередь, тогда вы можете получить сообщение об ошибке.

Я думаю, что это работает только тогда, когда вы запускаете его впервые случайно. GIL очень помогает вам; маловероятно, что поток QueuedCall действительно получит GIL и начнет работать немедленно. Тот факт, что вы на самом деле не заботитесь об очереди, кроме как о счетчике, также «помогает», это, кажется, работает: не имеет значения, если QueuedCall еще не попал в очередь, пока он не пуст ( этот QueuedCall может просто task_done другой элемент в очереди, и к тому моменту, когда этот элемент *1034* вызовет task_done, этот элемент, мы надеемся, будет в очереди, и он может быть помечен как выполненный этим) , И добавление sleep также заставляет новые потоки немного подождать, давая основному потоку время, чтобы убедиться, что они на самом деле находятся в очереди, именно поэтому это также маскирует проблему.

Также обратите внимание, что, насколько я могу судить по некоторым быстрым манипуляциям с интерактивной оболочкой, ваша очередь на самом деле все еще заполнена в конце, потому что вы на самом деле ничего не делаете из этого. Он просто получил количество task_done сообщений, равное количеству вещей, которые были put в нем, поэтому join работает.

Я думаю, вам нужно кардинально изменить дизайн класса QueuedCall или использовать другой примитив синхронизации, отличный от Queue. Queue предназначен для использования в очереди работ для уже существующих рабочих потоков. Запуск потока из конструктора для объекта, который вы помещаете в очередь, не совсем подходит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...