Огонь DeferredList после нескольких обратных вызовов - PullRequest
0 голосов
/ 25 апреля 2018

A twisted.internet.defer.DeferredList делает это:

Я объединяю группу отложений в один обратный вызов.

Я отслеживаю список отложенных для их обратных вызовови сделайте один обратный вызов, когда все они завершены, список (успех, результат) кортежей, «успех» - логическое значение.

Обратите внимание, что вы все равно можете использовать Deferred после помещения его в DeferredList.Например, вы можете подавить сообщения «Необработанная ошибка в отложенных», добавив ошибки в Deferreds после , поместив их в DeferredList, так как DeferredList не поглотит ошибки.(Хотя более удобный способ сделать это - просто установить флаг потреблениеErrors)

def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0, consumeErrors=0): (source)
    overrides twisted.internet.defer.Deferred.__init__
    Initialize a DeferredList.
    Parameters  deferredList    The list of deferreds to track. (type: list of Deferreds )
    fireOnOneCallback   (keyword param) a flag indicating that only one callback needs to be fired for me to call my callback
    fireOnOneErrback    (keyword param) a flag indicating that only one errback needs to be fired for me to call my errback
    consumeErrors   (keyword param) a flag indicating that any errors raised in the original deferreds should be consumed by this DeferredList. This is useful to prevent spurious warnings being logged.

В частности:

fireOnOneCallback (ключевое слово param) флаг, указывающийчто для вызова моего обратного вызова

мне нужен только один обратный вызов. Я ищу поведение, подобное fireOnOneCallback=True, но вместо этого запускаю n обратные вызовы.Я пытался сделать это, но это уже превращается в беспорядок.Я уверен, что есть лучший способ.

def _get_fired_index(deferred_list):
    for index, (success, value) in enumerate(deferred_list):
        if success:
            return index
    raise ValueError('No deferreds were fired.')


def _fire_on_other_callback(already_fired_index, deferred_list, callback, ):
    dlist_except_first_fired = (
        deferred_list[:already_fired_index]
        + deferred_list[already_fired_index + 1:]
    )
    dlist2 = DeferredList(dlist_except_first_fired, fireOnOneCallback=True)
    dlist2.addCallback(callback, deferred_list)


def _fire_on_two_callbacks(deferreds, callback, errback):
    dlist1 = DeferredList(deferreds, fireOnOneCallback=True)
    dlist1.addCallback(_get_fired_index)
    dlist1.addCallback(_fire_on_other_callback, deferreds, callback, errback)

Ответы [ 2 ]

0 голосов
/ 24 мая 2018

Вот еще один подход, использующий DeferredSemaphore для обработки потенциального состояния гонки.Это сработает, как только n отсрочек сработает и отменит остальные.

from twisted.internet import defer


def fireAfterNthCallback(deferreds, n):
    if not n or n > len(deferreds):
        raise ValueError

    results = {}
    finished_deferred = defer.Deferred()
    sem = defer.DeferredSemaphore(1)

    def wrap_sem(result, index):
        return sem.run(callback_result, result, index)

    def cancel_remaining():
        finished = [deferreds[index] for index in results.keys()]
        for d in finished:
            deferreds.remove(d)
        for d in deferreds:
            d.addErrback(lambda err: err.trap(defer.CancelledError))
            d.cancel()

    def callback_result(result, index):
        results[index] = result
        if len(results) >= n:
            cancel_remaining()
            finished_deferred.callback(results.values())
        return result

    for deferred_index, deferred in enumerate(deferreds):
        deferred.addCallback(wrap_sem, deferred_index)

    return finished_deferred
0 голосов
/ 25 апреля 2018

Вот один из возможных подходов.

from __future__ import print_function

import attr
from twisted.internet.defer import Deferred

def fireOnN(n, ds):
    acc = _Accumulator(n)
    for index, d in enumerate(ds):
        d.addCallback(acc.one_result, index)
    return acc.n_results

@attr.s
class _Accumulator(object):
    n = attr.ib()
    so_far = attr.ib(default=attr.Factory(dict))
    done = attr.ib(default=False)
    n_results = attr.ib(default=attr.Factory(Deferred))

    def one_result(self, result, index):
        if self.done:
            return result
        self.so_far[index] = result
        if len(self.so_far) == self.n:
            self.done = True
            so_far = self.so_far
            self.so_far = None
            self.n_results.callback(so_far)

dx = list(Deferred().addCallback(print, i) for i in range(3))
done = fireOnN(2, dx)
done.addCallback(print, "done")

for i, d in enumerate(dx):
    d.callback("result {}".format(i))

Обратите внимание, что эта реализация не работает с ошибками и, возможно, имеет другие недостатки (например, удержание ссылки n_results). Тем не менее, основная идея заключается в звуке: накапливать состояние от обратных вызовов до тех пор, пока не будет достигнуто желаемое условие, а затем запустить другое отложенное.

DeferredList только добавляет ненужную сложность к этой проблеме с ее несвязанными функциями и интерфейсом, не предназначенным для решения этой проблемы.

...