Как дать отложенный ответ AMP, не блокируя систему? - PullRequest
4 голосов
/ 10 августа 2011

(я очень открыт для предложений по улучшению названия.)

Я использую протокол AMP через Twisted для создания планировщика, который передает задания своим агентам. Агенты извлекают задания из планировщика, поэтому планировщик является сервером AMP, а агенты подключаются как клиенты.

Идея состоит в том, чтобы агент подключился, выбрал задание из верхней части (внутреннего планировщика) очереди заданий и затем продолжил свое выполнение. Однако эта очередь не всегда будет непустой. Таким образом, я рассчитываю воспользоваться преимуществом закрученной отложенной механики, чтобы просто иметь отложенный огонь на стороне агента, когда планировщику удалось выскакивать задание из очереди.

Реализация этого на стороне планировщика оказывается немного сложной, хотя. Способ работы AMP заключается в назначении функции каждой (предопределенной мной) команде, которую агент может отправить, при этом функция принимает все аргументы команды и возвращает словарь всех возвращаемых значений. Это означает, что мне нужно сделать все это из одной функции. Как правило, это не было бы проблемой, но здесь, похоже, мешает витая: мне нужно немного приостановить функцию, не останавливая витой цикл событий, таким образом позволяя ему фактически добавлять больше заданий в очередь, поэтому можно оторваться (По этой причине я не думаю, что обычный sleep() будет иметь желаемый эффект.) Что более важно, это означает, что я не могу придумать, как использовать какую-то искаженную функциональность, например, deferToThread(), потому что мне пришлось бы обрабатывать результаты этого (и иметь к ним только доступ) в отдельной функции, которую я бы назначил как обратный вызов deferred, поэтому я не знаю, что возвращать в Функция ответчика AMP после запуска отдельного потока и назначения его обратного вызова. Это иллюстрирует то, что я имею в виду, более четко:

def assignJob(agentID):
    # We expect the agentID, so we can store who we've given a job to.

    # Get a job without blocking even if the queue is originally empty.
    job = None
    while job is None:
        try:
            job = jobqueue.pop(0)
        except IndexError:
            # Imagine getJob simply tries to get a job every 5 seconds
            # (using sleep() safely because it's in a separate thread)
            # until it eventually gets one, which it returns
            d = deferToThread(getJob)

            # We would then need to have a separate function
            # , e.g. jobReturn() pick up the firing deferred and do
            # something with the result...
            d.addCallback(jobReturn)   

    # But if we do... We don't (necessarily) have a job to return here
    # because for all we know, the deferred from that thread hasn't even
    # fired yet.
    return {'job': ???}

(Это, очевидно, не фактический полный код функции - например, это метод для подкласса amp.AMP, как требуется.)

Реакторный метод callInThread() также поначалу кажется полезным (поскольку он не возвращает отложенное значение), но он не предлагает способа получить возвращаемое значение вызываемого объекта, который он выполняет (насколько я могу). см.) и, даже если бы это было так, это означало бы ожидание завершения потока, что блокировало бы этот метод на долгое время, что делает использование отдельного потока бессмысленным.

Так как мне заблокировать этот метод до тех пор, пока у меня не появится работа, но не весь цикл событий Twisted или, в качестве альтернативы, как я могу вернуть AMP-ответ вне его метода немедленного ответа?

1 Ответ

4 голосов
/ 10 августа 2011

Одна вещь, которую вы, возможно, пропустили, заключается в том, что сам метод респондента AMP также может возвращать Deferred (поиск может также возвращать Deferreds в AMP API документах ).Пока Deferred в конечном итоге срабатывает со словарем, который соответствует определению ответа команды, все будет работать нормально.

Также несколько связано, если вы хотите избежать использования потоков, вы можете взглянуть на twisted.internet.defer.DeferredQueue , структура данных очереди, которая изначально знает о Deferreds.

...