Я пишу веб-сканер, используя aiohttp, и моя программа падает с ошибками "RuntimeError: Session is closed"
в моем веб-сканере.
Основной цикл проходит первую итерацию, извлекая и обрабатывая все страницы в очереди URL без каких-либо проблем.Но затем, когда он входит fetch_pages()
во 2-й итерации основного цикла и делает первый вызов aiohttp.ClientSession.session.get()
, он выдает "RuntimeError: Session is closed"
.
Я не понимаю, почему я получаю эту ошибкупотому что мне кажется, что приведенный ниже код должен создавать новый менеджер контекста aiohttp.ClientSession()
каждый раз, когда вызывается указанная ниже функция get_batch()
, и закрывать сеанс в конце вызова функции.Но этого не происходит.Может кто-нибудь объяснить мне, почему я получаю эту ошибку?
Я разместил соответствующие части своего кода ниже (я пытался урезать как можно больше, но включил ссылки на полный исходный код ниже).
Вот основной цикл:
class Crawler():
((...))
def __init__(self):
self.loop = asyncio.get_event_loop()
self.url_queue = URLQueue(maxsize=10000) # urls are popped from URL queue
self.page_queue = asyncio.PriorityQueue() # when fetched, they are placed on page queue for html processing
((...))
async def fetch_pages(self):
print("Entering fetch_page()")
pages, errors = [], []
if self.url_queue.empty():
await asyncio.sleep(1)
else:
await self.fetcher.get_batch(self.BATCH_SIZE, self.url_queue, self.page_queue, self.error_queue)
((...))
async def process_html(self): ...
async def analyze_content(self): ...
async def extract_links(self): ...
async def index_content(self): ...
async def handle_errors(self): ...
((...))
async def main(self):
try:
while True:
tasks = [t.loop.create_task(t.fetch_pages()),
t.loop.create_task(t.process_html()),
t.loop.create_task(t.analyze_content()),
t.loop.create_task(t.index_content()),
t.loop.create_task(t.handle_errors())]
await asyncio.gather(*tasks)
except KeyboardInterrupt:
print("shutting down")
finally:
print("Pretending to save the URL queue, etc ... ")
t = Crawler()
if __name__ == "__main__":
#asyncio.run(crawler.crawl(index), debug=True)
t.loop.run_until_complete(t.main())
( полный код здесь ) ...
и вот код для цикла извлечения:
class Fetcher():
((...))
def __init__(self, domain_manager=None, http_headers = None, dns_cache_lifetime = 300, request_timeout = 30,
connection_timeout = 5, max_connections = 20, max_connections_per_host = 5, obey_robots = False,
verify_ssl_certs = False):
self.loop = asyncio.get_event_loop()
self.domain_manager = domain_manager # rate limit requests / robots.txt on per-domain basis
self.timeout = aiohttp.ClientTimeout(total=request_timeout,
connect=connection_timeout)
self.connector = aiohttp.TCPConnector(ttl_dns_cache=dns_cache_lifetime,
limit=max_connections,
limit_per_host=max_connections_per_host,
ssl=verify_ssl_certs)
async def fetch(self, url, session):
try:
async with session.get(url) as resp:
status = int(resp.status)
headers = dict(resp.headers)
if self.check_response_headers(url, status, headers):
html = await resp.text()
return {'url': url,
'headers': headers,
'html': html,
'last_visit': datetime.now()}
else:
raise FetchError(f"Fetch failed for url {url}: Header check failed (but why did we make it here?)",
url=url, exception=e, fetch_stage="GET")
except UnicodeDecodeError as e:
((...))
def check_response_headers(self, url, status, headers):
"""Given a response from fetch(), return a (Page object, error object) pair"""
((...))
async def fetch_with_dm(self, url, session, i):
"""fetches next url from queue until successfully fetches a page"""
domain = self.domain_manager.domain_from_url(url)
((...))
async with self.domain_manager.locks[domain]:
((...))
fetch_result = await self.fetch(url, session)
return fetch_result
async def get_batch(self, batch_size, url_queue, page_queue, error_queue):
start_time = datetime.now()
async with aiohttp.ClientSession(timeout=self.timeout, connector=self.connector) as session:
tasks = []
for i in range(batch_size):
url = None
score = None
if url_queue.empty():
break
else:
score, url = url_queue.get_nowait() # should we be blocking here / await / sleeping if no urls in queue?
if url == None:
raise ValueError("Received empty URL")
if score == None:
raise ValueError("Received empty URL score")
tasks.append(self.loop.create_task(self.fetch_with_dm(url, session, i)))
for p in asyncio.as_completed(tasks):
try:
page = await p
page['url_score'] = score
await page_queue.put((score, id(page), page))
except FetchError as fe:
await error_queue.put(fe)
( полный код здесь )
... Опять "сеанс закрыт"ошибка возникает при вызове session.get(url)
в fetch
, но только во второй итерации основного цикла ...