Как проверить асинхронные потоки (сопрограммы, задачи)? - PullRequest
0 голосов
/ 27 марта 2020

Что я должен сделать, чтобы протестировать сопрограмму login?

class Client:
    def __init__(self, config=None):
        self.config = config or ('0.0.0.0', 8080)
        self.reader = None
        self.writer = None

        self.connection = asyncio.create_task(self._connect())

    async def _connect(self):
        self.reader, self.writer = await asyncio.open_connection(*self.config)

    async def login(self, username, password):
        await self.connection  # 2

        md5 = hashlib.md5()
        md5.update(username.encode(ENCODING) + password.encode(ENCODING))
        hash_ = md5.hexdigest()

        # `construct` returns message, in bytes, based on parameters.
        login_message = construct(1, username, password, 182, hash_, 157)
        self.writer.write(login_message)  # 1
        await self.writer.drain()

Более конкретно, я хочу проверить, что сообщение для входа всегда отправляется в правильном формате (1), а также, возможно, также факт, что соединение должно быть установлено в первую очередь перед отправкой сообщения (2). Кроме того, я не могу запустить локальную копию сервера, если это имеет значение.

Возможно ли это? Должен ли я даже проверить это?

Я использую pytest, я пытался использовать библиотеку под названием pytest-asyncio, но я просто не понимаю, как проверить что-то подобное.

...