Как объект Future используется в asyncio Python? - PullRequest
0 голосов
/ 07 мая 2018

Я понял основную идею цикла событий. Существует центральный цикл, прослушивающий набор файловых дескрипторов, и, если он готов к чтению или записи, выполняется соответствующий обратный вызов.

Мы можем использовать совместные процедуры вместо обратных вызовов, поскольку они могут быть приостановлены и возобновлены. Однако это означает, что должен существовать некоторый протокол связи между сопрограммой и циклом обработки событий, чтобы все работало правильно?

Я написал простой Echo Server с сопрограммами, который выдает fd вместе с заинтересованным действием, таким как yield fd, 'read', yield fd, 'write' и т. Д., А затем цикл обработки событий регистрирует select соответственно. Обратным вызовом было бы просто возобновить сопрограмму. Он работает нормально, и я добавил код ниже.

Теперь я просто пытаюсь понять, как на самом деле работает await. Похоже, что он не дает fds и соответствующее действие, как мой пример кода, вместо этого он дает объект Future Так что же происходит под капотом? Как оно взаимодействует с циклом событий?

Я предполагаю, что await async.sleep(1) будет казнен так:

  1. Цикл событий будет выполнять подпрограмму и достигнет async.sleep(1).
  2. Будет создан Future Объект.
  3. Затем он создаст fd, вероятно, используя timerfd_create с обратным вызовом для завершения Future.
  4. Затем он отправит его в цикл событий для отслеживания.
  5. await выдаст объект Future в цикл обработки событий, который его выполняет.
  6. Цикл Event установит функцию обратного вызова объекта Future, чтобы просто возобновить сопрограмму.

Я имею в виду, что могу использовать Future вот так. Но так ли это на самом деле? Может кто-нибудь помочь мне понять это немного лучше?

PS: timerfd_create был взят в качестве примера, потому что я не мог понять, как таймеры могут быть реализованы в цикле событий. Для этого вопроса сетевые fds тоже будут найдены. Если кто-то может помочь мне с тем, как реализован таймер, это было бы неплохо!

Вот моя реализация простого Echo Server с использованием подпрограмм:

"""
Tasks are just generators or coroutines
"""
import socket
import selectors

select = selectors.DefaultSelector()
tasks_to_complete = []

def create_server(port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    hostname = socket.gethostname()
    s.bind((hostname, port))
    s.listen(5)
    print("Starting server on hostname at port %s %s" % (hostname, port))
    return s

def handle_clients(s):
    while True:
        print("yielding for read on server %s" % id(s))
        yield (s, 'read')
        c, a = s.accept()
        t = handle_client(c)
        print("appending a client handler")
        tasks_to_complete.append(t)

def handle_client(c):
    while True:
        print("yielding for read client %s" % id(c))
        yield (c, 'read')
        data = c.recv(1024)
        if len(data) == 0:
            return "Connection Closed"
        print("yielding for write on client %s" % id(c))
        yield (c, 'write')
        c.send(bytes(data))

def run(tasks_to_complete):
    while True:
        while tasks_to_complete:
            t = tasks_to_complete.pop(0)
            try:
                fd, event = t.send(None)
                if event == 'read':
                    event = selectors.EVENT_READ
                elif event == 'write':
                    event = selectors.EVENT_WRITE
                def context_callback(fd, t):
                    def callback():
                        select.unregister(fd)
                        tasks_to_complete.append(t)
                    return callback
                select.register(fd, event, context_callback(fd, t))
            except StopIteration as e:
                print(e.value)
        events = select.select()
        for key, mask in events:
            callback = key.data
            callback()

tasks_to_complete.append(handle_clients(create_server(9000)))

run(tasks_to_complete)

1 Ответ

0 голосов
/ 07 мая 2018

Я написал простой Echo Server с сопрограммами, который выдает fd вместе с заинтересованным действием, таким как yield fd, 'read', yield fd, 'write' и т. Д., А затем цикл обработки событий регистрирует select соответственно.

Это похоже на то, как работает curio Дэйва Бизли. Чтобы узнать больше о концепции, посмотрите эту лекцию , где он строит цикл событий из самых основ. (Он использует синтаксис до 3.5 yield from, но он работает точно так же, как await.)

Как вы обнаружили, asyncio работает немного по-другому, хотя принцип все еще схож.

Теперь я просто пытаюсь понять, как на самом деле работает await. Похоже, он не выдает fds и строку, соответствующую действию, как в примере выше, вместо этого он дает объект Future. Так что же происходит под капотом? Как это происходит с Event Loop?

Короткая версия заключается в том, что блокирующие сопрограммы используют глобальную переменную (через asyncio.get_event_loop()) для извлечения цикла событий. У цикла событий есть методы, которые планируют обратные вызовы, которые будут вызываться, когда происходит интересное событие. asyncio.sleep вызывает loop.call_later, чтобы обеспечить его возобновление по истечении времени ожидания.

Полученный Future является просто удобным способом уведомления цикла событий о результате, как только он будет готов, чтобы он мог правильно возобновить ожидание Task (сопрограммы, управляемой циклом событий), которая ожидала операция блокировки, а также обработка исключений и отмены. См. Task.__step для кровавых подробностей .

timerfd_create был взят в качестве примера, потому что я не мог понять, как таймеры могут быть реализованы в цикле событий.

Таймеры реализованы таким образом, что цикл событий отслеживает как файловые дескрипторы, так и тайм-ауты, и выдает select, который завершается, когда истекает самый ранний таймаут . Лекция Дэйва, связанная выше, демонстрирует концепцию лаконично.

...