Почему asyncio.StreamWriter.drain должен вызываться явно? - PullRequest
0 голосов
/ 14 декабря 2018

Из документа: https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter.write

запись (данные)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

утечка сопрограммы ()

Wait until it is appropriate to resume writing to the stream. Example:

writer.write(data)
await writer.drain()

Из чего японимаю,

  • Вы должны вызывать drain каждый раз, когда вызывается write.
  • Если я не думаю, write заблокирует поток цикла

Тогда почему write не является сопрограммой, которая вызывает его автоматически?Зачем звонить write без необходимости сливать?Я могу вспомнить два случая:

  1. Вы хотите write и close немедленно
  2. Вам необходимо буферизовать некоторые данные до того, как сообщение будет завершено.

Во-первых, это особый случай, я думаю, у нас может быть другой API.Буферизация должна обрабатываться внутри функции записи, и приложению должно быть все равно.


Позвольте мне задать вопрос по-другому.В чем недостаток этого?Эффективно ли это делает версия python3.8?

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()

Примечание: drain doc явно заявляет следующее:

Когда ждать нечего, сток () немедленно возвращается.


Читая ответ и ссылки снова, я думаю, что функции работают следующим образом. Примечание : Проверьте принятый ответ для более точной версии.

def write(data):
    remaining = socket.try_write(data)
    if remaining:
        _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data

async def drain():
    if len(_pendingbuffer) < BUF_LIMIT:
        return
    await wait_until_other_side_is_up_to_speed()
    assert len(_pendingbuffer) < BUF_LIMIT

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()        

Итак, когда использовать что:

  1. Когда данные не являются непрерывными, Как реагирование наHTTP-запрос.Нам просто нужно отправить некоторые данные и не заботиться о том, когда они достигнуты, и память не имеет значения - просто используйте write
  2. То же, что и выше, но память является проблемой, используйте awrite
  3. При потоковой передаче данных большому количеству клиентов (например, потоку в реальном времени или огромному файлу).Если данные дублируются в каждом из буферов соединения, это определенно переполнит ОЗУ.В этом случае напишите цикл, который принимает порцию данных на каждой итерации и вызовите awrite.В случае огромного файла, loop.sendfile лучше, если доступно.

1 Ответ

0 голосов
/ 14 декабря 2018

Из того, что я понимаю, (1) Вам нужно вызывать сток каждый раз, когда вызывается запись.(2) Если я не думаю, что запись заблокирует поток цикла

Ни то, ни другое, но путаница вполне понятна.write() работает следующим образом:

  • Вызов write() просто сохраняет данные в буфере, оставляя их в цикле событий, чтобы фактически записать их позже.время и без дальнейшего вмешательства со стороны программы.Что касается приложения, данные записываются в фоновом режиме так быстро, как другая сторона способна их получить.Другими словами, каждый write() будет планировать передачу своих данных, используя столько записей на уровне ОС, сколько потребуется, причем эти записи будут выполняться, когда соответствующий файловый дескриптор фактически доступен для записи.Все это происходит автоматически, даже не ожидая drain().

  • write() - не сопрограмма, и она абсолютно никогда не блокирует цикл обработки событий.

Второе свойство звучит удобно, но на самом деле это главный недостаток из write().Запись отделена от принятия данных, поэтому, если вы пишете данные быстрее, чем ваш коллега может их прочитать, внутренний буфер будет расти, и у вас будет утечка памяти в ваших руках.Ожидание drain() приостанавливает сопрограмму, как только буфер становится слишком большим.Вам не нужно ждать drain() после каждой записи, но вам нужно время от времени ждать, обычно между циклами итераций.Например:

while True:
    response = await peer1.readline()
    peer2.write(b'<response>')
    peer2.write(response)
    peer2.write(b'</response>')
    await peer2.drain()

drain() немедленно возвращается, если количество ожидающих неписанных данных мало.Если данные превышают высокий порог, drain() приостанавливает вызывающую сопрограмму до тех пор, пока количество ожидающих неписанных данных не опустится ниже низкого порога.Пауза приведет к тому, что сопрограмма прекратит чтение с peer1, что, в свою очередь, заставит одноранговый узел замедлить скорость, с которой он отправляет нам данные.Этот вид обратной связи называется обратным давлением.

Python 3.8 будет поддерживать ожидание write напрямую , что устранит необходимость явных вызовов drain().

Буферизация должна обрабатываться внутри функции записи, и приложению должно быть все равно.

Это в значительной степени то, как write() работает сейчас - он обрабатывает буферизацию и позволяет приложению не заботиться, для лучшего или худшего.Также см. этот ответ для получения дополнительной информации.


Обращаясь к отредактированной части вопроса:

Читая ответ и ссылки снова, я думаю, чтофункции работают следующим образом.

write() все еще немного умнее, чем это.Он не будет пытаться записывать только один раз, он фактически организует продолжение записи данных до тех пор, пока не останется данных для записи.Это произойдет, даже если вы никогда не дождетесь drain() - единственное, что приложение должно сделать, это позволить циклу событий работать достаточно долго, чтобы записать все.

Более правильный псевдокод writeи drain может выглядеть так:

class ToyWriter:
    def __init__(self):
        self._buf = bytearray()
        self._empty = asyncio.Event(True)

    def write(self, data):
        self._buf.extend(data)
        loop.add_writer(self._fd, self._do_write)
        self._empty.clear()

    def _do_write(self):
        # Automatically invoked by the event loop when the
        # file descriptor is writable, regardless of whether
        # anyone calls drain()
        while self._buf:
            try:
                nwritten = os.write(self._fd, self._buf)
            except OSError as e:
                if e.errno == errno.EWOULDBLOCK:
                    return  # continue once we're writable again
                raise
            self._buf = self._buf[nwritten:]
        self._empty.set()
        loop.remove_writer(self._fd, self._do_write)

    async def drain(self):
        if len(self._buf) > 64*1024:
            await self._empty.wait()

Реальная реализация более сложна, потому что:

  • написана поверх стиля Twisted уровень транспорта / протокола со своим сложным управлением потоком , не поверх os.write;
  • , потому что drain() действительно не ждет, пока буферпустой, но до достижения нижнего водяного знака ;
  • исключения, отличные от EWOULDBLOCK, заданные в _do_write, сохраняются и повторно повышаются в drain().

Последний пункт - еще одна веская причина для вызова drain(), чтобы фактически заметить, что узел пропал из-за сбоя при записи.

...