Я пытаюсь проверить асинхронную функцию в Python 3.6.8 вместе с:
asyncio==3.4.3
requests==2.22.0
responses==0.10.6
pytest==4.6.3
pytest-asyncio==0.10.0
Я запрашиваю несколько URL-адресов одновременно, но урезанная функция выглядит примерно так:
async def check_urls_tester(url: str) -> int:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
loop = asyncio.get_event_loop()
futures = [loop.run_in_executor(executor, requests.head, url)]
response = await asyncio.gather(*futures, return_exceptions=True)
return int(response[0].status_code)
Моя попытка теста выглядит так:
@responses.activate
@pytest.mark.asyncio
async def test_check_urls_async(self):
responses.add(responses.HEAD, "https://www.google.com", status=404)
resp = await check_urls_tester("https://www.google.com")
assert resp == 404
Я ожидаю, что моя функция вернет 404, поскольку я использую отклики для макетирования запросов, но это возвращает 200. Я могу успешно использовать ответы с неасинхронными функциями, но не с этой асинхронной функцией .
Какой шаг я здесь пропускаю?