Один готовый вариант, который может помочь с этим, - twisted.internet.defer.DeferredSemaphore
. Это асинхронная версия нормального (счетного) семафора, который вы, возможно, уже знаете, если вы проделали много многопоточное программирование.
Семафор (считающий) очень похож на мьютекс (замок). Но в тех случаях, когда мьютекс может быть получен только один раз до соответствующего выпуска, семафор (счетный) может быть сконфигурирован так, чтобы разрешать произвольное (но заданное) число приобретений до успешного выполнения, прежде чем требуются какие-либо соответствующие релизы.
Вот пример использования DeferredSemaphore
для запуска десяти асинхронных операций, но для запуска не более трех из них одновременно:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
также имеет явные методы acquire
и release
, но метод run
настолько удобен, что почти всегда соответствует желаемому. Он вызывает метод acquire
, который возвращает Deferred
. К этому первому Deferred
он добавляет функцию обратного вызова, которая вызывает переданную функцию (вместе с позиционными аргументами или аргументами ключевых слов). Если эта функция возвращает Deferred
, то к этой второй Deferred
добавляется обратный вызов, который вызывает метод release
.
Синхронный случай также обрабатывается путем немедленного вызова release
. Ошибки также обрабатываются, позволяя им распространяться, но при этом убедитесь, что необходимые release
сделаны для того, чтобы DeferredSemaphore
оставался в согласованном состоянии. Результат функции, переданной в run
(или результат Deferred
, который она возвращает) становится результатом Deferred
, возвращаемым run
.
Другой возможный подход может быть основан на DeferredQueue
и cooperate
. DeferredQueue
в основном похож на обычную очередь, но его метод get
возвращает Deferred
. Если в момент вызова в очереди нет элементов, Deferred
не сработает, пока элемент не будет добавлен.
Вот пример:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
Обратите внимание, что рабочая функция async
такая же, как и в первом примере. Однако на этот раз есть также функция worker
, которая явно извлекает задания из DeferredQueue
и обрабатывает их с помощью async
(добавляя async
в качестве обратного вызова к Deferred
, возвращаемому get
) , Генератор worker
приводится в действие cooperate
, который повторяет его один раз после каждого Deferred
, который приводит к пожарам. Затем основной цикл запускает три из этих рабочих генераторов, так что в любой момент времени будут выполняться три задания.
Этот подход включает в себя немного больше кода, чем DeferredSemaphore
, но имеет некоторые преимущества, которые могут быть интересны. Во-первых, cooperate
возвращает экземпляр CooperativeTask
, который имеет полезные методы, такие как pause
, resume
и пару других. Кроме того, все задания, назначенные одному и тому же кооператору, будут взаимодействовать друг с другом при планировании, чтобы не перегружать цикл обработки событий (и это то, что дает API его имя). Со стороны DeferredQueue
также можно установить ограничения на количество ожидающих обработки элементов, поэтому вы можете избежать полной перегрузки вашего сервера (например, если ваши процессоры изображений зависают и перестают выполнять задачи). Если код, вызывающий put
, обрабатывает исключение переполнения очереди, вы можете использовать это как давление, чтобы попытаться прекратить принимать новые задания (возможно, перенести их на другой сервер или предупредить администратора). Делать подобные вещи с DeferredSemaphore
немного сложнее, поскольку нет способа ограничить количество заданий, ожидающих получения семафора.