Как я могу читать по одной строке из трио ReceiveStream? - PullRequest
0 голосов
/ 02 декабря 2018

asyncio имеет StreamReader.readline(), что позволяет что-то вроде:

while True:
    line = await reader.readline()
    ...

(я не вижу async for доступным в asyncio, но это будет очевидная эволюция)

Как мне получить эквивалент с трио?

Я не вижу никакой поддержки высокого уровня в трио 0,9.Все, что я вижу, это ReceiveStream.receive_some(), который возвращает двоичные фрагменты произвольного размера;мне кажется нетривиальным декодировать и преобразовывать это во что-то построчное.Можно ли использовать стандартную библиотечную функцию или фрагмент?Я нашел модуль io stdlib, который выглядит многообещающе, но я не вижу способа предоставить метод «feed».

Ответы [ 2 ]

0 голосов
/ 02 декабря 2018

Вы правы, в данный момент в Trio нет поддержки высокого уровня.Должно быть что-то , хотя я не уверен на 100%, как это должно выглядеть.Я открыл вопрос , чтобы обсудить его.

А пока ваша реализация выглядит разумной.

Если вы хотите сделать ее еще более надежной, вы можете (1) использовать bytearray вместо bytes для вашего буфера, чтобы добавить и удалить амортизированный O (n) вместо O (n ^ 2), (2) установить ограничение на максимальную длину строки, чтобы злые коллеги моглине заставлять вас тратить бесконечную память на буферизацию бесконечно длинных строк, (3) возобновлять каждый вызов до find в том месте, где остановился последний, вместо того, чтобы каждый раз перезапускаться с начала, снова, чтобы избежать O (n ^ 2)поведение.Ничто из этого не является супер важным, если вы имеете дело только с разумными длинами строк и хорошо себя ведущими пирами, но это тоже не повредит.

Вот измененная версия вашего кода, которая пытается включить этитри идеи:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(не стесняйтесь использовать по любой лицензии.)

0 голосов
/ 02 декабря 2018

Я закончил тем, что написал это.Не проверено должным образом (исправления приветствуются), но похоже, что оно работает:

class LineReader:
    def __init__(self, stream):
        self.stream = stream
        self._line_generator = self.generate_lines()

    @staticmethod
    def generate_lines():
        buf = bytes()
        while True:
            newline_idx = buf.find(b'\n')
            if newline_idx < 0:
                # no b'\n' found in buf
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                buf = buf[newline_idx+1:]
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

Затем я могу обернуть ReceiveStream с помощью LineReader и использовать метод readline.Добавление __aiter__() и __anext()__ было бы тривиально, но мне это не нужно в моем случае (я портирую что-то на трио, которое в любом случае не использует async for).

Другойнедостаток в том, что он предполагает UTF-8 или аналогичную кодировку, где b'\n' переводы строк существуют в неизмененном объекте закодированных байтов.

Было бы неплохо полагаться на библиотечные функции, чтобы справиться с этим;другие ответы приветствуются.

...