Можно ли определить функцию asyn c как задачу Celery? - PullRequest
0 голосов
/ 09 апреля 2020

Я занимаюсь разработкой сканера и хотел бы использовать asyncio для асинхронного сканирования ссылок, чтобы повысить производительность. Я уже использовал Celery в своем синхронном сканере, что позволяет мне запускать несколько сканеров параллельно. Однако сам сканер является syn c, поэтому производительность низкая. Я перепроектировал свой код с использованием asyncio и определил новый сканер как задачу Celery, но сканер получает сетевые ошибки без видимой причины, что заставляет меня думать, возможно ли даже использовать сопрограмму в качестве Celery задача? Ответ на этот вопрос поможет мне выяснить, есть ли проблема с моим кодом или есть несовместимость со стеком, который я использую.

async def url_helper(url):
    return url.html.absolute_links

async def url_worker(session, queue, urls, request_count):
    while True:
        # Get a "work item" out of the queue.
        try:
            current = await queue.get()
        except asyncio.QueueEmpty:
            return

        # Stay in domain
        if not config.regex_url.match(current):
            # Notify the queue that the "work item" has been processed.
            queue.task_done()
            return

        # Check against the desired level of depth
        depth = urls.get(current, 0)
        if depth == config.max_depth:
            logger.info("Out of depth: {}".format(current))
            # Notify the queue that the "work item" has been processed.
            queue.task_done()
            return

        # Get all URLs from the page
        try:

            logger.info("current link: {}".format(current))
            resp = await session.get(
                current, allow_redirects=True, timeout=15
            )
            new_urls = await url_helper(resp)
            request_count += 1
        except TimeoutError as e:
            logger.exception(e)
            # Notify the queue that the "work item" has been processed.
            queue.task_done()
            return
        except OSError as e:
            logger.exception(e)
            # Notify the queue that the "work item" has been processed.
            queue.task_done()
            return

        # Add URLs to queue if internal and not already in urls
        for link in new_urls:
            if config.regex_url.match(link):
                if link not in urls:
                    urls[link] = depth + 1
                    await queue.put(link)
                    # Notify the queue that the "work item" has been processed.
                    queue.task_done()


async def url_crawler(url):
    """
        Crawls.
    """
    # Set time
    start = time.time()

    http_asession = requests_html.AsyncHTMLSession()
    logger.debug("Sending GET request to start_url...")
    start_response = await http_asession.get(url=url)
    logger.debug("Received {} response.".format(start_response.status_code))

    request_count = 1

    # Dict to hold all URLs on the website
    urls = {}

    # List containing all URLs from the start page
    start_urls = list(start_response.html.absolute_links)
    logger.info("start urls: %s", start_urls)

    # A queue to store our to-be-visited urls
    queue = asyncio.Queue()
    # Max concurrency
    max_workers = config.max_concurrency

    # Put urls from start page in the queue
    for url in start_urls:
        await queue.put(url)

    # Create three worker tasks to process the queue concurrently.
    coros = asyncio.gather(
        *[
            url_worker(
                queue=queue,
                session=http_asession,
                urls=urls,
                request_count=request_count,
            )
            for i in range(max_workers)
        ]
    )
    await coros

    await http_asession.close()

    logger.info("Crawled {} links.".format(len(urls)))
    logger.info(urls)
    logger.debug("Made {} HTTP requests.".format(request_count))

    finish = time.time()
    logger.info("Execution time: {}".format(finish - start))

@celery_app.task
def run_crawler(url):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(url_crawler(url))
    loop.close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...