Я занимаюсь разработкой сканера и хотел бы использовать asyncio
для асинхронного сканирования ссылок, чтобы повысить производительность. Я уже использовал Celery
в своем синхронном сканере, что позволяет мне запускать несколько сканеров параллельно. Однако сам сканер является syn c, поэтому производительность низкая. Я перепроектировал свой код с использованием asyncio
и определил новый сканер как задачу Celery
, но сканер получает сетевые ошибки без видимой причины, что заставляет меня думать, возможно ли даже использовать сопрограмму в качестве Celery
задача? Ответ на этот вопрос поможет мне выяснить, есть ли проблема с моим кодом или есть несовместимость со стеком, который я использую.
async def url_helper(url):
return url.html.absolute_links
async def url_worker(session, queue, urls, request_count):
while True:
# Get a "work item" out of the queue.
try:
current = await queue.get()
except asyncio.QueueEmpty:
return
# Stay in domain
if not config.regex_url.match(current):
# Notify the queue that the "work item" has been processed.
queue.task_done()
return
# Check against the desired level of depth
depth = urls.get(current, 0)
if depth == config.max_depth:
logger.info("Out of depth: {}".format(current))
# Notify the queue that the "work item" has been processed.
queue.task_done()
return
# Get all URLs from the page
try:
logger.info("current link: {}".format(current))
resp = await session.get(
current, allow_redirects=True, timeout=15
)
new_urls = await url_helper(resp)
request_count += 1
except TimeoutError as e:
logger.exception(e)
# Notify the queue that the "work item" has been processed.
queue.task_done()
return
except OSError as e:
logger.exception(e)
# Notify the queue that the "work item" has been processed.
queue.task_done()
return
# Add URLs to queue if internal and not already in urls
for link in new_urls:
if config.regex_url.match(link):
if link not in urls:
urls[link] = depth + 1
await queue.put(link)
# Notify the queue that the "work item" has been processed.
queue.task_done()
async def url_crawler(url):
"""
Crawls.
"""
# Set time
start = time.time()
http_asession = requests_html.AsyncHTMLSession()
logger.debug("Sending GET request to start_url...")
start_response = await http_asession.get(url=url)
logger.debug("Received {} response.".format(start_response.status_code))
request_count = 1
# Dict to hold all URLs on the website
urls = {}
# List containing all URLs from the start page
start_urls = list(start_response.html.absolute_links)
logger.info("start urls: %s", start_urls)
# A queue to store our to-be-visited urls
queue = asyncio.Queue()
# Max concurrency
max_workers = config.max_concurrency
# Put urls from start page in the queue
for url in start_urls:
await queue.put(url)
# Create three worker tasks to process the queue concurrently.
coros = asyncio.gather(
*[
url_worker(
queue=queue,
session=http_asession,
urls=urls,
request_count=request_count,
)
for i in range(max_workers)
]
)
await coros
await http_asession.close()
logger.info("Crawled {} links.".format(len(urls)))
logger.info(urls)
logger.debug("Made {} HTTP requests.".format(request_count))
finish = time.time()
logger.info("Execution time: {}".format(finish - start))
@celery_app.task
def run_crawler(url):
loop = asyncio.get_event_loop()
loop.run_until_complete(url_crawler(url))
loop.close()