Очередь удаленных звонков перспективному брокеру Python Twisted? - PullRequest
11 голосов
/ 19 мая 2010

Сила Twisted (для python) заключается в его асинхронной структуре (я думаю). Я написал сервер обработки изображений, который принимает запросы через Perspective Broker. Это прекрасно работает, пока я кормлю меньше двухсот изображений за раз. Однако иногда в него попадают сотни изображений практически одновременно. Поскольку он пытается обрабатывать их все одновременно, сервер падает.

В качестве решения я бы хотел поставить в очередь remote_calls на сервере, чтобы он обрабатывал только ~ 100 изображений одновременно. Кажется, что это может быть что-то, что Twisted уже делает, но я не могу найти это. Любые идеи о том, как начать реализацию этого? Точка в правильном направлении? Спасибо!

Ответы [ 2 ]

29 голосов
/ 19 мая 2010

Один готовый вариант, который может помочь с этим, - twisted.internet.defer.DeferredSemaphore. Это асинхронная версия нормального (счетного) семафора, который вы, возможно, уже знаете, если вы проделали много многопоточное программирование.

Семафор (считающий) очень похож на мьютекс (замок). Но в тех случаях, когда мьютекс может быть получен только один раз до соответствующего выпуска, семафор (счетный) может быть сконфигурирован так, чтобы разрешать произвольное (но заданное) число приобретений до успешного выполнения, прежде чем требуются какие-либо соответствующие релизы.

Вот пример использования DeferredSemaphore для запуска десяти асинхронных операций, но для запуска не более трех из них одновременно:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore также имеет явные методы acquire и release, но метод run настолько удобен, что почти всегда соответствует желаемому. Он вызывает метод acquire, который возвращает Deferred. К этому первому Deferred он добавляет функцию обратного вызова, которая вызывает переданную функцию (вместе с позиционными аргументами или аргументами ключевых слов). Если эта функция возвращает Deferred, то к этой второй Deferred добавляется обратный вызов, который вызывает метод release.

Синхронный случай также обрабатывается путем немедленного вызова release. Ошибки также обрабатываются, позволяя им распространяться, но при этом убедитесь, что необходимые release сделаны для того, чтобы DeferredSemaphore оставался в согласованном состоянии. Результат функции, переданной в run (или результат Deferred, который она возвращает) становится результатом Deferred, возвращаемым run.

Другой возможный подход может быть основан на DeferredQueue и cooperate. DeferredQueue в основном похож на обычную очередь, но его метод get возвращает Deferred. Если в момент вызова в очереди нет элементов, Deferred не сработает, пока элемент не будет добавлен.

Вот пример:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

Обратите внимание, что рабочая функция async такая же, как и в первом примере. Однако на этот раз есть также функция worker, которая явно извлекает задания из DeferredQueue и обрабатывает их с помощью async (добавляя async в качестве обратного вызова к Deferred, возвращаемому get) , Генератор worker приводится в действие cooperate, который повторяет его один раз после каждого Deferred, который приводит к пожарам. Затем основной цикл запускает три из этих рабочих генераторов, так что в любой момент времени будут выполняться три задания.

Этот подход включает в себя немного больше кода, чем DeferredSemaphore, но имеет некоторые преимущества, которые могут быть интересны. Во-первых, cooperate возвращает экземпляр CooperativeTask, который имеет полезные методы, такие как pause, resume и пару других. Кроме того, все задания, назначенные одному и тому же кооператору, будут взаимодействовать друг с другом при планировании, чтобы не перегружать цикл обработки событий (и это то, что дает API его имя). Со стороны DeferredQueue также можно установить ограничения на количество ожидающих обработки элементов, поэтому вы можете избежать полной перегрузки вашего сервера (например, если ваши процессоры изображений зависают и перестают выполнять задачи). Если код, вызывающий put, обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить принимать новые задания (возможно, перенести их на другой сервер или предупредить администратора). Делать подобные вещи с DeferredSemaphore немного сложнее, поскольку нет способа ограничить количество заданий, ожидающих получения семафора.

0 голосов
/ 24 мая 2010

Вам также может понравиться txRDQ (Resizable Dispatch Queue), который я написал. Google это, это находится в коллекции TX на LaunchPad. Извините, у меня нет больше времени для ответа - я собираюсь выйти на сцену.

Терри

...