Получение «пул закрыт» после первого теста приложения aiohttp - PullRequest
0 голосов
/ 21 ноября 2019

У меня странная проблема с тестированием приложения aiohttp.

Короче говоря, в проекте я использую:

  • gino с sqlalchemy. Я не использую любую функцию orm. Ранее использовался aiopg и проблема была та же.
  • aiohttp-pytest

Приложение является поставщиком API GraphQl. Я делаю простой запрос списка пользователей.

Все работает нормально, когда я тестирую API вручную или использую любой другой инструмент, который не включает pytest, unittest и т. Д. Я не смог воспроизвести проблему.

Но как только я запускаю тесты, первый проходит успешно, и все после перерывов и ответа возвращает ошибку: «пул закрыт». Сначала я использовал aiopg и нашел проблемы, похожие на мои, и решил попробовать другой инструмент и получил ту же самую проблему.

Only first test runs normally all others fail with same reason

Важные детали думаю стоитупоминания. Я не передаю движок на app['db'] Я написал класс для обработки соединений, а также разделил записи и чтения баз данных.

Вот как это выглядит:

class DatabaseHandler:
    """Keeps track of databases used in app"""
    __write_db_list__: List[Engine] = []
    __read_db_list__: List[Engine] = []

    def __init__(self,
                 write_dbs: Union[Iterable[Engine], None] = None,
                 read_dbs: Union[Iterable[Engine], None] = None) \
            -> None:
        """Constructor. Accepts list of write and read database engines"""
        super().__init__()

        if write_dbs:
            for url in write_dbs:
                self.add_write_db(url)

        if read_dbs:
            for url in read_dbs:
                self.add_read_db(url)

    @staticmethod
    async def url_to_engine(url):
        """Builds db engine from url"""
        return await gino.create_engine(url)

    @staticmethod
    async def build(write_urls: Iterable[str], read_urls: Iterable[str]):
        return DatabaseHandler(
            [await DatabaseHandler.url_to_engine(u) for u in write_urls],
            [await DatabaseHandler.url_to_engine(u) for u in read_urls]
        )

    def add_read_db(self, engine: Engine):
        """Adds db engine to read list"""
        self.__read_db_list__.append(engine)

    def add_write_db(self, engine: Engine):
        """Adds db engine to write list"""
        self.__write_db_list__.append(engine)

    def get_for_read(self) -> Union[Engine, None]:
        """Returns engine for reading requests (SELECT)"""

        if len(self.__read_db_list__) > 0:
            result = self.__read_db_list__[0]
        else:
            result = self.get_for_write()

        return result

    def get_for_write(self) -> Union[Engine, None]:
        """Returns engine for writing requests (INSERT, UPDATE)"""
        result = None

        if len(self.__write_db_list__) > 0:
            result = self.__write_db_list__[0]

        return result

    async def disconnect(self) -> None:
        """Method to disconect from db"""
        read_db_engines: List[Engine] = self.__read_db_list__
        for db_engine in read_db_engines:
            await db_engine.close()

        write_db_engine: List[Engine] = self.__write_db_list__
        for db_engine in write_db_engine:
            await db_engine.close()

и дБинициализация в функции запуска:

async def init_db(app: web.Application) -> None:
    """Initialize db connections"""
    app['db'] = await DatabaseHandler.build(
        [app.get('config', {}).get('WRITE_DB')],
        [app.get('config', {}).get('READ_DB')]
    )

Как выглядит тест:

class UserListTestCase(AioHTTPTestCase):
    async def get_application(self) -> web.Application:
        return init_app()

    @unittest_run_loop
    async def test_get_all_users(self):
        client: TestClient = self.client
        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": ""
            })
        )

        assert resp.status == 200
        raise Exception(await resp.json())
        data = (await resp.json()).get('data', {}).get('users', {})
        assert data.get('count') == 7
        assert len(data.get('result')) == 7

        resp.close()

    @unittest_run_loop
    async def test_get_users_by_status(self):
        client: TestClient = self.client
        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": "",
                "isActive": True
            })
        )

        assert resp.status == 200
        raise Exception(await resp.json())
        data = (await resp.json()).get('data', {}).get('user', {})
        assert data.get('count') == 4
        assert len(data.get('result')) == 4

        assert set([u.get('id') for u in data.get('result', [])]) \
               == {1, 2, 5, 7}

        resp: ClientResponse = await client.post(
            ADMIN_BASE_URI,
            json=await build_gql_request(USERS_LIST, {
                "query": "",
                "is_active": False
            })
        )

        assert resp.status == 200
        data = (await resp.json()).get('data', {}).get('user', {})
        assert data.get('count') == 3
        assert len(data.get('result')) == 3
        assert [u.get('id') for u in data.get('result', [])] == [3, 4, 6]

Я пытался выполнить тест с помощью функций, но результат был тот же. Какой бы первый тест ни выполнялся, он выполняется успешно, и все дальнейшие тесты завершаются с ошибкой «пул закрыт».

Я хотел бы понять, почему это происходит и есть ли вероятность возникновения такой же проблемы на производстве.

Мне кажется, я что-то упускаю из-за того, как работают asyncio и asyncio-среды.

Буду рад любой помощи.

...