Я понял основную идею цикла событий. Существует центральный цикл, прослушивающий набор файловых дескрипторов, и, если он готов к чтению или записи, выполняется соответствующий обратный вызов.
Мы можем использовать совместные процедуры вместо обратных вызовов, поскольку они могут быть приостановлены и возобновлены. Однако это означает, что должен существовать некоторый протокол связи между сопрограммой и циклом обработки событий, чтобы все работало правильно?
Я написал простой Echo Server с сопрограммами, который выдает fd вместе с заинтересованным действием, таким как yield fd, 'read'
, yield fd, 'write'
и т. Д., А затем цикл обработки событий регистрирует select
соответственно. Обратным вызовом было бы просто возобновить сопрограмму. Он работает нормально, и я добавил код ниже.
Теперь я просто пытаюсь понять, как на самом деле работает await
. Похоже, что он не дает fds и соответствующее действие, как мой пример кода, вместо этого он дает объект Future
Так что же происходит под капотом? Как оно взаимодействует с циклом событий?
Я предполагаю, что await async.sleep(1)
будет казнен так:
- Цикл событий будет выполнять подпрограмму и достигнет
async.sleep(1)
.
- Будет создан
Future
Объект.
- Затем он создаст fd, вероятно, используя
timerfd_create
с обратным вызовом для завершения Future
.
- Затем он отправит его в цикл событий для отслеживания.
await
выдаст объект Future
в цикл обработки событий, который его выполняет.
- Цикл Event установит функцию обратного вызова объекта
Future
, чтобы просто возобновить сопрограмму.
Я имею в виду, что могу использовать Future
вот так. Но так ли это на самом деле? Может кто-нибудь помочь мне понять это немного лучше?
PS: timerfd_create
был взят в качестве примера, потому что я не мог понять, как таймеры могут быть реализованы в цикле событий. Для этого вопроса сетевые fds тоже будут найдены. Если кто-то может помочь мне с тем, как реализован таймер, это было бы неплохо!
Вот моя реализация простого Echo Server с использованием подпрограмм:
"""
Tasks are just generators or coroutines
"""
import socket
import selectors
select = selectors.DefaultSelector()
tasks_to_complete = []
def create_server(port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
hostname = socket.gethostname()
s.bind((hostname, port))
s.listen(5)
print("Starting server on hostname at port %s %s" % (hostname, port))
return s
def handle_clients(s):
while True:
print("yielding for read on server %s" % id(s))
yield (s, 'read')
c, a = s.accept()
t = handle_client(c)
print("appending a client handler")
tasks_to_complete.append(t)
def handle_client(c):
while True:
print("yielding for read client %s" % id(c))
yield (c, 'read')
data = c.recv(1024)
if len(data) == 0:
return "Connection Closed"
print("yielding for write on client %s" % id(c))
yield (c, 'write')
c.send(bytes(data))
def run(tasks_to_complete):
while True:
while tasks_to_complete:
t = tasks_to_complete.pop(0)
try:
fd, event = t.send(None)
if event == 'read':
event = selectors.EVENT_READ
elif event == 'write':
event = selectors.EVENT_WRITE
def context_callback(fd, t):
def callback():
select.unregister(fd)
tasks_to_complete.append(t)
return callback
select.register(fd, event, context_callback(fd, t))
except StopIteration as e:
print(e.value)
events = select.select()
for key, mask in events:
callback = key.data
callback()
tasks_to_complete.append(handle_clients(create_server(9000)))
run(tasks_to_complete)