Из того, что я понимаю, (1) Вам нужно вызывать сток каждый раз, когда вызывается запись.(2) Если я не думаю, что запись заблокирует поток цикла
Ни то, ни другое, но путаница вполне понятна.write()
работает следующим образом:
Вызов write()
просто сохраняет данные в буфере, оставляя их в цикле событий, чтобы фактически записать их позже.время и без дальнейшего вмешательства со стороны программы.Что касается приложения, данные записываются в фоновом режиме так быстро, как другая сторона способна их получить.Другими словами, каждый write()
будет планировать передачу своих данных, используя столько записей на уровне ОС, сколько потребуется, причем эти записи будут выполняться, когда соответствующий файловый дескриптор фактически доступен для записи.Все это происходит автоматически, даже не ожидая drain()
.
write()
- не сопрограмма, и она абсолютно никогда не блокирует цикл обработки событий.
Второе свойство звучит удобно, но на самом деле это главный недостаток из write()
.Запись отделена от принятия данных, поэтому, если вы пишете данные быстрее, чем ваш коллега может их прочитать, внутренний буфер будет расти, и у вас будет утечка памяти в ваших руках.Ожидание drain()
приостанавливает сопрограмму, как только буфер становится слишком большим.Вам не нужно ждать drain()
после каждой записи, но вам нужно время от времени ждать, обычно между циклами итераций.Например:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
drain()
немедленно возвращается, если количество ожидающих неписанных данных мало.Если данные превышают высокий порог, drain()
приостанавливает вызывающую сопрограмму до тех пор, пока количество ожидающих неписанных данных не опустится ниже низкого порога.Пауза приведет к тому, что сопрограмма прекратит чтение с peer1
, что, в свою очередь, заставит одноранговый узел замедлить скорость, с которой он отправляет нам данные.Этот вид обратной связи называется обратным давлением.
Python 3.8 будет поддерживать ожидание write
напрямую , что устранит необходимость явных вызовов drain()
.
Буферизация должна обрабатываться внутри функции записи, и приложению должно быть все равно.
Это в значительной степени то, как write()
работает сейчас - он обрабатывает буферизацию и позволяет приложению не заботиться, для лучшего или худшего.Также см. этот ответ для получения дополнительной информации.
Обращаясь к отредактированной части вопроса:
Читая ответ и ссылки снова, я думаю, чтофункции работают следующим образом.
write()
все еще немного умнее, чем это.Он не будет пытаться записывать только один раз, он фактически организует продолжение записи данных до тех пор, пока не останется данных для записи.Это произойдет, даже если вы никогда не дождетесь drain()
- единственное, что приложение должно сделать, это позволить циклу событий работать достаточно долго, чтобы записать все.
Более правильный псевдокод write
и drain
может выглядеть так:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
Реальная реализация более сложна, потому что:
- написана поверх стиля Twisted уровень транспорта / протокола со своим сложным управлением потоком , не поверх
os.write
; - , потому что
drain()
действительно не ждет, пока буферпустой, но до достижения нижнего водяного знака ; - исключения, отличные от
EWOULDBLOCK
, заданные в _do_write
, сохраняются и повторно повышаются в drain()
.
Последний пункт - еще одна веская причина для вызова drain()
, чтобы фактически заметить, что узел пропал из-за сбоя при записи.