Как правильно добавить FileTransport в Asyncio? - PullRequest
0 голосов
/ 30 июня 2018

Я пишу приложение, которое читает текстовые данные и действует на них. Текстовые данные могут поступать из порта TCP или из текстового файла (который содержит данные, ранее считанные из порта TCP и заархивированные). Я пишу это на Python 3, и использование asyncio кажется очевидным инструментом для использования.

Использовать Streams API open_connection() просто, чтобы открыть порт TCP и прочитать его. Архитектура asyncio имеет концепцию Transport и Protocol для нижнего и верхнего уровней ввода-вывода. Так что, похоже, мне следует реализовать транспорт для чтения текста из файла и передать его в протокол. Это позволит мне отделить остальную часть моего приложения от того, были ли текстовые данные получены из порта TCP или из файла.

Но мне трудно понять, как сказать asyncio , чтобы использовать мой любимый транспорт.

  • Streams API open_connection() имеет список параметров, который связан с TCP-портом Transport, без возможности указать другой Transport, а тем более параметры, такие как путь к файлу.
  • open_connection() оборачивается и звонит loop.create_connection(). Это так же специализировано для транспорта портов TCP. Еще сейчас способ предоставить другой транспорт.
  • Реализация loop.create_connection() получает свой Транспортный объект из self._make_ssl_transport() или self._make_socket_transport(). У них есть альтернативные реализации в asyncio.selector_events.BaseSelectorEventLoop и asyncio.proactor_events.BaseProactorEventLoop, поэтому мы явно подошли к тому моменту, когда должен был быть выбран транспорт файлов.

Мне не хватает места, где asyncio позволяет мне сказать, какой транспорт использовать? Или asyncio действительно закодировано до его корней, чтобы использовать свой собственный порт TCP и UDP-дейтаграмму Transports, и больше ничего?

Если я хочу разрешить возможность использования моего собственного транспорта с asyncio , похоже, что мне нужно расширить цикл событий или написать более гибкую альтернативу create_connection(), которая закодирована для конкретного реализация цикла событий. Это похоже на большую работу и уязвимо для изменений в реализации.

Или это глупое поручение обрабатывать ввод файлов с помощью транспорта? Должен ли я вместо этого структурировать свой код, чтобы сказать:

if (using_tcp_port): await asyncio.open_connection(....) else: completely_different_file_implementation(....)

1 Ответ

0 голосов
/ 30 июня 2018

Согласно документации из API create_connection(), принимает протокол, а создает потоковое транспорт, который является TCP-соединением. Так что это не должен быть API для пользовательских транспортов.

Однако идея повторно использовать один и тот же протокол либо для транспорта TCP, либо для пользовательского транспорта файлов, действительна. Это не будет «совсем другая реализация», но, по крайней мере, без использования create_connection(). Давайте предположим, что это read_file():

def my_protocol_factory():
    return your_protocol

if using_tcp_port:
    transport, protocol = await loop.create_connection(my_protocol_factory, host, port)
else:
    transport, protocol = await read_file(loop, my_protocol_factory, path_to_file)

Тогда у вас будет что-то вроде этого:

from asyncio import transports

import aiofiles  # https://github.com/Tinche/aiofiles


def read_file(loop, protocol_factory, path):
    protocol = protocol_factory()
    transport = FileTransport(path, loop)
    transport.set_protocol(protocol)
    return transport, protocol


class FileTransport(transports.ReadTransport):
    def __init__(self, path, loop):
        super().__init__()
        self._path = path
        self._loop = loop
        self._closing = False

    def is_closing(self):
        return self._closing

    def close(self):
        self._closing = True

    def set_protocol(self, protocol):
        self._protocol = protocol
        self._loop.create_task(self._do_read())

    def get_protocol(self):
        return self._protocol

    async def _do_read(self):
        try:
            async with aiofiles.open(self._path) as f:
                self._loop.call_soon(self._protocol.connection_made, self)
                async for line in f:
                    self._loop.call_soon(self._protocol.data_received, line)
                    if self._closing:
                        break
                self._loop.call_soon(self._protocol.eof_received)
        except Exception as ex:
            self._loop.call_soon(self._protocol.connection_lost, ex)
        else:
            self._loop.call_soon(self._protocol.connection_lost, None)
...