Получение «RuntimeError: Сессия закрыта» в aiohttp.ClientSession.get (), даже после создания нового диспетчера контекста - PullRequest
0 голосов
/ 10 февраля 2019

Я пишу веб-сканер, используя aiohttp, и моя программа падает с ошибками "RuntimeError: Session is closed" в моем веб-сканере.

Основной цикл проходит первую итерацию, извлекая и обрабатывая все страницы в очереди URL без каких-либо проблем.Но затем, когда он входит fetch_pages() во 2-й итерации основного цикла и делает первый вызов aiohttp.ClientSession.session.get(), он выдает "RuntimeError: Session is closed".

Я не понимаю, почему я получаю эту ошибкупотому что мне кажется, что приведенный ниже код должен создавать новый менеджер контекста aiohttp.ClientSession() каждый раз, когда вызывается указанная ниже функция get_batch(), и закрывать сеанс в конце вызова функции.Но этого не происходит.Может кто-нибудь объяснить мне, почему я получаю эту ошибку?

Я разместил соответствующие части своего кода ниже (я пытался урезать как можно больше, но включил ссылки на полный исходный код ниже).


Вот основной цикл:

class Crawler():

    ((...))

    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.url_queue = URLQueue(maxsize=10000)        # urls are popped from URL queue
        self.page_queue = asyncio.PriorityQueue()       # when fetched, they are placed on page queue for html processing  

    ((...))

    async def fetch_pages(self):
        print("Entering fetch_page()")
        pages, errors = [], []
        if self.url_queue.empty():    
            await asyncio.sleep(1)

        else:
            await self.fetcher.get_batch(self.BATCH_SIZE, self.url_queue, self.page_queue, self.error_queue)

    ((...))

    async def process_html(self): ...
    async def analyze_content(self): ...
    async def extract_links(self): ...
    async def index_content(self): ...
    async def handle_errors(self): ...

    ((...))

    async def main(self):

        try:
            while True:
                tasks = [t.loop.create_task(t.fetch_pages()),
                        t.loop.create_task(t.process_html()),
                        t.loop.create_task(t.analyze_content()),
                        t.loop.create_task(t.index_content()),
                        t.loop.create_task(t.handle_errors())]

                await asyncio.gather(*tasks)

        except KeyboardInterrupt:
            print("shutting down")

        finally:
            print("Pretending to save the URL queue, etc ... ")   

    t = Crawler()

    if __name__ == "__main__":
        #asyncio.run(crawler.crawl(index), debug=True)
        t.loop.run_until_complete(t.main())

( полный код здесь ) ...

и вот код для цикла извлечения:

class Fetcher():

    ((...))

    def __init__(self, domain_manager=None, http_headers = None, dns_cache_lifetime = 300, request_timeout = 30, 
                 connection_timeout = 5, max_connections = 20, max_connections_per_host = 5, obey_robots = False,
                 verify_ssl_certs = False):

        self.loop = asyncio.get_event_loop()

        self.domain_manager = domain_manager    # rate limit requests / robots.txt on per-domain basis

        self.timeout = aiohttp.ClientTimeout(total=request_timeout, 
                                             connect=connection_timeout)  

        self.connector = aiohttp.TCPConnector(ttl_dns_cache=dns_cache_lifetime, 
                                              limit=max_connections, 
                                              limit_per_host=max_connections_per_host,
                                              ssl=verify_ssl_certs)


    async def fetch(self, url, session):
        try:
            async with session.get(url) as resp:                
                status = int(resp.status)
                headers = dict(resp.headers)        

                if self.check_response_headers(url, status, headers):

                    html = await resp.text()

                    return {'url': url,
                            'headers': headers,
                            'html': html,
                            'last_visit': datetime.now()}
                else:
                    raise FetchError(f"Fetch failed for url {url}: Header check failed (but why did we make it here?)", 
                                     url=url, exception=e, fetch_stage="GET")

        except UnicodeDecodeError as e:
       ((...))


    def check_response_headers(self, url, status, headers):
        """Given a response from fetch(), return a (Page object, error object) pair"""

       ((...))


    async def fetch_with_dm(self, url, session, i):
        """fetches next url from queue until successfully fetches a page"""

        domain = self.domain_manager.domain_from_url(url)

        ((...))

        async with self.domain_manager.locks[domain]:

            ((...))

            fetch_result = await self.fetch(url, session)

            return fetch_result


    async def get_batch(self, batch_size, url_queue, page_queue, error_queue):
        start_time = datetime.now()

        async with aiohttp.ClientSession(timeout=self.timeout, connector=self.connector) as session:
            tasks = []
            for i in range(batch_size):
                url = None          
                score = None

                if url_queue.empty():
                    break

                else:
                    score, url = url_queue.get_nowait()  # should we be blocking here / await / sleeping if no urls in queue?

                    if url == None:
                        raise ValueError("Received empty URL")

                    if score == None:
                        raise ValueError("Received empty URL score")

                    tasks.append(self.loop.create_task(self.fetch_with_dm(url, session, i)))


            for p in asyncio.as_completed(tasks):
                try:
                    page = await p
                    page['url_score'] = score
                    await page_queue.put((score, id(page), page))

                except FetchError as fe:
                    await error_queue.put(fe)

( полный код здесь )

... Опять "сеанс закрыт"ошибка возникает при вызове session.get(url) в fetch, но только во второй итерации основного цикла ...

...