У меня странная проблема с тестированием приложения aiohttp.
Короче говоря, в проекте я использую:
- gino с sqlalchemy. Я не использую любую функцию orm. Ранее использовался aiopg и проблема была та же.
- aiohttp-pytest
Приложение является поставщиком API GraphQl. Я делаю простой запрос списка пользователей.
Все работает нормально, когда я тестирую API вручную или использую любой другой инструмент, который не включает pytest, unittest и т. Д. Я не смог воспроизвести проблему.
Но как только я запускаю тесты, первый проходит успешно, и все после перерывов и ответа возвращает ошибку: «пул закрыт». Сначала я использовал aiopg и нашел проблемы, похожие на мои, и решил попробовать другой инструмент и получил ту же самую проблему.
Важные детали думаю стоитупоминания. Я не передаю движок на app['db']
Я написал класс для обработки соединений, а также разделил записи и чтения баз данных.
Вот как это выглядит:
class DatabaseHandler:
"""Keeps track of databases used in app"""
__write_db_list__: List[Engine] = []
__read_db_list__: List[Engine] = []
def __init__(self,
write_dbs: Union[Iterable[Engine], None] = None,
read_dbs: Union[Iterable[Engine], None] = None) \
-> None:
"""Constructor. Accepts list of write and read database engines"""
super().__init__()
if write_dbs:
for url in write_dbs:
self.add_write_db(url)
if read_dbs:
for url in read_dbs:
self.add_read_db(url)
@staticmethod
async def url_to_engine(url):
"""Builds db engine from url"""
return await gino.create_engine(url)
@staticmethod
async def build(write_urls: Iterable[str], read_urls: Iterable[str]):
return DatabaseHandler(
[await DatabaseHandler.url_to_engine(u) for u in write_urls],
[await DatabaseHandler.url_to_engine(u) for u in read_urls]
)
def add_read_db(self, engine: Engine):
"""Adds db engine to read list"""
self.__read_db_list__.append(engine)
def add_write_db(self, engine: Engine):
"""Adds db engine to write list"""
self.__write_db_list__.append(engine)
def get_for_read(self) -> Union[Engine, None]:
"""Returns engine for reading requests (SELECT)"""
if len(self.__read_db_list__) > 0:
result = self.__read_db_list__[0]
else:
result = self.get_for_write()
return result
def get_for_write(self) -> Union[Engine, None]:
"""Returns engine for writing requests (INSERT, UPDATE)"""
result = None
if len(self.__write_db_list__) > 0:
result = self.__write_db_list__[0]
return result
async def disconnect(self) -> None:
"""Method to disconect from db"""
read_db_engines: List[Engine] = self.__read_db_list__
for db_engine in read_db_engines:
await db_engine.close()
write_db_engine: List[Engine] = self.__write_db_list__
for db_engine in write_db_engine:
await db_engine.close()
и дБинициализация в функции запуска:
async def init_db(app: web.Application) -> None:
"""Initialize db connections"""
app['db'] = await DatabaseHandler.build(
[app.get('config', {}).get('WRITE_DB')],
[app.get('config', {}).get('READ_DB')]
)
Как выглядит тест:
class UserListTestCase(AioHTTPTestCase):
async def get_application(self) -> web.Application:
return init_app()
@unittest_run_loop
async def test_get_all_users(self):
client: TestClient = self.client
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": ""
})
)
assert resp.status == 200
raise Exception(await resp.json())
data = (await resp.json()).get('data', {}).get('users', {})
assert data.get('count') == 7
assert len(data.get('result')) == 7
resp.close()
@unittest_run_loop
async def test_get_users_by_status(self):
client: TestClient = self.client
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": "",
"isActive": True
})
)
assert resp.status == 200
raise Exception(await resp.json())
data = (await resp.json()).get('data', {}).get('user', {})
assert data.get('count') == 4
assert len(data.get('result')) == 4
assert set([u.get('id') for u in data.get('result', [])]) \
== {1, 2, 5, 7}
resp: ClientResponse = await client.post(
ADMIN_BASE_URI,
json=await build_gql_request(USERS_LIST, {
"query": "",
"is_active": False
})
)
assert resp.status == 200
data = (await resp.json()).get('data', {}).get('user', {})
assert data.get('count') == 3
assert len(data.get('result')) == 3
assert [u.get('id') for u in data.get('result', [])] == [3, 4, 6]
Я пытался выполнить тест с помощью функций, но результат был тот же. Какой бы первый тест ни выполнялся, он выполняется успешно, и все дальнейшие тесты завершаются с ошибкой «пул закрыт».
Я хотел бы понять, почему это происходит и есть ли вероятность возникновения такой же проблемы на производстве.
Мне кажется, я что-то упускаю из-за того, как работают asyncio и asyncio-среды.
Буду рад любой помощи.