Для всех, кто сталкивался с этой страницей, я мог сам в этом разобраться.
По совету @ brad-solomon я переключился с ProcessPoolExecutor
на ThreadPoolExecutor
, чтобы управлять параллельными аспектами этогосценарий (подробности см. в его комментарии).
В исходном вопросе ключом было использование add_done_callback
метода ThreadPoolExecutor
в сочетании с модификацией Scraper.scrape
и новым методом CrawlManager.proc_scraper_results
как показано ниже:
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
class Scraper:
"""
Scrapes a single url
"""
def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()
def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)
# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}
# these three items will be passed to the callback
# function with in a future object
return self.internal_urls, self.url, self.content
class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single website
"""
def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ThreadPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}
def proc_scraper_results(self, future):
# get the items of interest from the future object
internal_urls, url, content = future._result[0], future._result[1], future._result[2]
# assign scraped data/content
self.data[url] = content
# also add scraped links to queue if they
# aren't already queued or already processed
for link_url in internal_urls:
if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:
self.to_crawl.put(link_url)
def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)
# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')
# add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)
job = self.pool.submit(Scraper(target_url).scrape)
# to add_done_callback we send another function, this one from CrawlManager
# when this function is itself called, it will be pass a `future` object
job.add_done_callback(self.proc_scraper_results)
except Empty:
print("All done.")
except Exception as e:
print(e)
if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()
Результатом этого является очень значительное сокращение продолжительности этой программы.