Ожидание на мероприятии с Twisted и PB - PullRequest
1 голос
/ 22 июня 2010

У меня есть приложение на Python, которое использует несколько потоков, и мне интересно, как лучше всего ожидать чего-то в python, не записывая процессор или не блокируя GIL.

мое приложение использует витую, и я порождаю поток длязапустить длинную операцию, чтобы я не топал по нитке реактора.Эта длинная операция также порождает некоторые потоки, используя twisted deferToThread, чтобы сделать что-то еще, и исходный поток хочет дождаться результатов от defreds.

Что я делал, так это

while self._waiting:
    time.sleep( 0.01 )

который, казалось, нарушал искаженные объекты PB от получения сообщений, поэтому я подумал, что sleep блокирует GIL.Дальнейшее расследование по указанным ниже плакатам показало, что это не так.

Существуют лучшие способы ожидания потоков, не блокируя поток реактора или питона, опубликованные ниже.

Ответы [ 5 ]

13 голосов
/ 23 июня 2010

Если вы уже используете Twisted, вам никогда не нужно «ждать», как это.

Как вы это описали:

Я создаю поток для запускадлинная операция ... Эта длинная операция также порождает некоторые потоки, используя витую deferToThread ...

Это означает, что вы вызываете deferToThread из потока "длинной операции", а не из основногопоток (тот, где reactor.run() работает).Как уже заметил Жан-Поль Кальдероне в комментарии, вы можете только вызывать Twisted API (например, deferToThread) из основного потока реактора.

Запрет, который вывидение является распространенным симптомом несоблюдения этого правила.Это не имеет ничего общего с GIL, и все, что связано с тем фактом, что вы перевели реактор Twisted в нерабочее состояние.

Основываясь на вашем свободном описании вашей программы, я попытался написать примерпрограмма, которая делает то, о чем вы говорите, полностью на основе Twisted API, порождая все потоки через Twisted и контролируя их все из основного потока реактора.

import time

from twisted.internet import reactor
from twisted.internet.defer import gatherResults
from twisted.internet.threads import deferToThread, blockingCallFromThread

def workReallyHard():
    "'Work' function, invoked in a thread."
    time.sleep(0.2)

def longOperation():
    for x in range(10):
        workReallyHard()
        blockingCallFromThread(reactor, startShortOperation, x)
    result = blockingCallFromThread(reactor, gatherResults, shortOperations)
    return 'hooray', result

def shortOperation(value):
    workReallyHard()
    return value * 100

shortOperations = []

def startShortOperation(value):
    def done(result):
        print 'Short operation complete!', result
        return result
    shortOperations.append(
        deferToThread(shortOperation, value).addCallback(done))

d = deferToThread(longOperation)
def allDone(result):
    print 'Long operation complete!', result
    reactor.stop()
d.addCallback(allDone)

reactor.run()

Обратите внимание, что в точке в allDone, гдереактор остановлен, вы можете запустить еще одну «долгую операцию» и снова запустить процесс.

5 голосов
/ 23 июня 2010

Я недавно узнал, что звонит time.sleep( X ) заблокирует GIL для все время X и, следовательно, заморозить ВСЕ темы Python за это время период.

Вы ошиблись - это определенно не , как это работает. В каком источнике вы нашли эту неверную информацию?

В любом случае, тогда вы уточняете (в комментариях - лучше отредактируйте свой Q!), Что вы используете deferToThread и ваша проблема с этим заключается в том, что ...:

Ну да, я откладываю действие на нить и дать крутой обратный вызов. Но родительский поток должен ждать для всей серии суб тем завершить, прежде чем он может перейти на новый набор подпотоков для порождения

Так что используйте в качестве обратного вызова метод объекта со счетчиком - начните его с 0, увеличивайте его на единицу каждый раз, когда вы переносите поток, и уменьшайте на единицу в методе обратного вызова.

Когда метод обратного вызова видит, что уменьшенный счетчик вернулся к 0, он знает, что мы закончили "завершать всю серию подпотоков", а затем пришло время "перейти к новому set of sub threads to spawn "и, таким образом, только в этом случае вызывает функцию или метод" spawn a new set of sub threads "- это так просто!

например. (за исключением опечаток и c, поскольку это непроверенный код, просто чтобы дать вам представление) ...:

class Waiter(object):

  def __init__(self, what_next, *a, **k):
    self.counter = 0
    self.what_next = what_next
    self.a = a
    self.k = k

  def one_more(self):
    self.counter += 1

  def do_wait(self, *dont_care):
    self.counter -= 1
    if self.counter == 0:
    self.what_next(*self.a, **self.k)


def spawn_one_thread(waiter, long_calculation, *a, **k):
  waiter.one_more()
  d = threads.deferToThread(long_calculation, *a, **k)
  d.addCallback(waiter.do_wait)

def spawn_all(waiter, list_of_lists_of_functions_args_and_kwds):
  if not list_of_lists_of_functions_args_and_kwds:
    return
  if waiter is None:
    waiter=Waiter(spawn_all, list_of_lists_of_functions_args_and_kwds)
  this_time = list_of_list_of_functions_args_and_kwds.pop(0)
  for f, a, k in this_time:
    spawn_one_thread(waiter, f, *a, **k)

def start_it_all(list_of_lists_of_functions_args_and_kwds):
  spawn_all(None, list_of_lists_of_functions_args_and_kwds)
5 голосов
/ 22 июня 2010

Вы пробовали условные переменные ?Они используются как

condition = Condition()

def consumer_in_thread_A():
    condition.acquire()
    try:
        while resource_not_yet_available:
            condition.wait()
        # Here, the resource is available and may be 
        # consumed
    finally:
        condition.release()

def produce_in_thread_B():
    # ... create resource, whatsoever
    condition.acquire()
    try:
        condition.notify_all()
    finally:
        condition.release()

Переменные условия действуют как блокировки (acquire и release), но их основное назначение - предоставить механизм управления, который позволяет wait для них быть notify -d или notify_all -d.

2 голосов
/ 23 июня 2010

Согласно источнику Python, time.sleep () не содержит GIL.

http://code.python.org/hg/trunk/file/98e56689c59c/Modules/timemodule.c#l920

Обратите внимание на использование Py_BEGIN_ALLOW_THREADS и Py_END_ALLOW_THREADS, как описано здесь:

http://docs.python.org/c-api/init.html#thread-state-and-the-global-interpreter-lock

1 голос
/ 22 июня 2010

Модуль threading позволяет создавать поток, который затем представляется объектом Thread. Этот объект имеет метод join, который можно использовать для ожидания завершения подпотока.

См. http://docs.python.org/library/threading.html#module-threading

...