Многопоточность в python. Библиотека потоковой передачи - PullRequest
0 голосов
/ 08 мая 2020

Я пытаюсь ускорить работу своего веб-скрейпера, реализуя многопоточность с использованием библиотеки потоков, но время выполнения не быстрее, чем при использовании однопоточного. Я попытался настроить craper, чтобы вы могли ввести количество страниц, которые вы хотите очистить, и / или количество переходов от исходного начального числа до go до.

import sys
import queue
import requests
import time
import os
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
import matplotlib.pyplot as plt
import threading
import statistics
import concurrent.futures

runtimes = []


def crawler(frontier, req_pages, req_hops, output_directory):
    f = open("report.txt", "w")
    f.write("")
    f.close()

    #multithreaded#
    with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
        executor.submit(crawler_thread, frontier, req_pages, req_hops, output_directory)

    #single threaded#
    # crawler_thread(frontier, req_pages, req_hops, output_directory)



def crawler_thread(frontier, req_pages, req_hops, output_directory):

    while not frontier.empty() and req_pages > 0  and req_hops > 0:

        start_time0 = time.perf_counter()

        try:

            url = frontier.get()
            #print("Trying to Crawl: ", url)
            if url == '*': # denotes end of current hop has been hit
                req_hops = req_hops - 1
                frontier.put('*') # mark end of next hop
            elif crawl_permitted(url):
                parsed_url = urlparse(url)
                filename = parsed_url.netloc[4:] if parsed_url.netloc.find('www') == 0 else parsed_url.netloc
                filename = filename + parsed_url.path.replace('/', '-')

                if not already_saved(filename, output_directory):
                    response = requests.get(url, verify = False)
                    time.sleep(3)  #sleep for a few seconds for politeness. To do : implement politeness windows based on the host's crawl_delay specified robots.txt
                    open(output_directory + '/' + filename, 'w', encoding='utf-8').write(response.text)     
                    req_pages = req_pages -1

                    soup = BeautifulSoup(response.text, 'html.parser')  
                    for link in soup.find_all('a'):

                        cleaned_link = clean_link(hyperlink = link.get('href'), website = url) 
                        if cleaned_link: 
                            frontier.put(cleaned_link)


        except:
            f = open("report.txt", "a")
            f.write("crawler failed on url: " + url) + "\n"
            f.close()

        end_time0 = time.perf_counter()
        runtimes.append(end_time0-start_time0)
        # refresh pages, robots.txt, politeness window - to do

#create url queue (frontier) parse argv to pass into crawler 
frontier = seeds_to_frontier(seeds_filename = sys.argv[1])
req_pages = int(sys.argv[2])
req_hops = int(sys.argv[3])
output_directory = sys.argv[4]

start_time1 = time.perf_counter()

#crawl
crawler(frontier, req_pages, req_hops, output_directory)

end_time1 = time.perf_counter()

f = open("report.txt", "a")
f.write("Total Scrape Time: " +  str(end_time1-start_time1) + "\n")
f.write("Average Scrape Time for Each Url: " + str(statistics.mean(runtimes)) + "\n")  
f.close()

plt.plot(runtimes)
plt.ylabel("execution time (s)")
plt.xlabel("url queue iteration")
plt.title("scrapetime for each url")
plt.savefig("execution_time_of_scraped_urls")

Вот дополнительные функции, которые я использую в приведенном выше коде, если вы хотите их увидеть (но я не думаю, что они являются причиной моей ошибки):

def seeds_to_frontier(seeds_filename):
    seeds_file = open(seeds_filename, 'r') 

    frontier = queue.Queue()
    for page in seeds_file:
        frontier.put(page.strip('\n'))
    frontier.put('*') # marks end of hop 0

    seeds_file.close()
    return frontier

def crawl_permitted(url):
    rp = RobotFileParser()

    parsed_url = urlparse(url)
    filename = parsed_url.netloc.rsplit('.', 2)[-2] + '.' + parsed_url.netloc.rsplit('.', 2)[-1]
    robots_url = parsed_url.scheme + '://' + filename + '/robots.txt'

    if not already_saved(filename, 'robots'):
        response = requests.get(robots_url)
        open('robots/' + filename, 'w', encoding='utf-8').write(response.text)

    rp.set_url(robots_url)
    rp.read()   
    return rp.can_fetch('*', url)


def url_delay(url):
    rp = RobotFileParser()

    #set up url for robotfileparser
    parsed_url = urlparse(url)
    filename = parsed_url.netloc.rsplit('.', 2)[-2] + '.' + parsed_url.netloc.rsplit('.', 2)[-1]
    robots_url = parsed_url.scheme + '://' + filename + '/robots.txt'

    #parse the robots.txt and extract crawl_delay
    rp.set_url(robots_url)
    rp.read()
    crawl_delay = rp.crawl_delay("*") if rp.crawl_delay("*") else 0
    return crawl_delay


def already_saved(file, directory):
    for root, dirs, files in os.walk(directory):
        if file in files:
            return 1
    return 0

def clean_link(hyperlink, website): 
    parsed_website = urlparse(website)
    parsed_hyperlink = urlparse(hyperlink)
    cleaned_link = hyperlink

    if not parsed_hyperlink.scheme and not parsed_hyperlink.netloc and not parsed_hyperlink.path and not parsed_hyperlink.params \
        and not parsed_hyperlink.query and not parsed_hyperlink.fragment:
        return ''

    if parsed_hyperlink.scheme and parsed_hyperlink.scheme != 'http':
        return '' 

    if parsed_hyperlink.path == '/': # duplicate - self references
        return ''

    if parsed_hyperlink.fragment: # duplicate - bookmarks
        return ''

    # avoid pdfs, images,  # to do: The below solution is not the best to avoid all media
    if len(parsed_hyperlink.path) > 4 and (parsed_hyperlink.path[-4:] in [".pdf", ".jpg", ".png", ".svg", ".jpeg"]): 
        return ''

    # take care of invalid characters - to do

    if not (".edu" in parsed_hyperlink.netloc):# only crawl edu pages
        return ''

    if not parsed_hyperlink.netloc: 
        if parsed_hyperlink.path[0] == '/':
            cleaned_link = parsed_website.scheme + '://' + parsed_website.netloc + hyperlink
        else:
            # bug - hyperlink 'smi.ucr.edu' parsed as path and not netloc - ???
            if parsed_hyperlink.path[0:3] == 'tel:': # remove incorrectly formatted telephone links
                print('incorrect tel link: ' + hyperlink)
                return ''
            # double check
            cleaned_link = website + hyperlink

    return cleaned_link

Однопоточный код работает нормально и очищает страницу в среднем за 0,7 секунды. Многопоточный и скребет с такой же скоростью ??? То, как я изначально установил это (показанное выше, первая ячейка), похоже, запускало большой l oop в одном потоке. Я попытался отредактировать (код в ячейке ниже), чтобы исправить это.

def crawler(frontier, req_pages, req_hops, output_directory):
    f = open("report.txt", "w")
    f.write("")
    f.close()

    #multithreaded#
    # with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
    #   executor.submit(crawler_thread, frontier, req_pages, req_hops, output_directory)

    #single threaded#
    # crawler_thread(frontier, req_pages, req_hops, output_directory)


    #testing stuff

    while not frontier.empty() and req_pages > 0  and req_hops > 0:
        with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
            executor.submit(crawler_thread, frontier, req_pages, req_hops, output_directory)


def crawler_thread(frontier, req_pages, req_hops, output_directory):

    lock = threading.Lock()

    lock.acquire()
    if req_pages > 0  and req_hops > 0:

        lock.release()

        start_time0 = time.perf_counter()

        try:

            url = frontier.get()
            print("Trying to Crawl: ", url)
            if url == '*': # denotes end of current hop has been hit
                lock.acquire()
                req_hops = req_hops - 1
                lock.release()
                frontier.put('*') # mark end of next hop
            elif crawl_permitted(url):
                parsed_url = urlparse(url)
                filename = parsed_url.netloc[4:] if parsed_url.netloc.find('www') == 0 else parsed_url.netloc
                filename = filename + parsed_url.path.replace('/', '-')

                if not already_saved(filename, output_directory):
                    response = requests.get(url, verify = False)
                    #time.sleep(10)  #sleep for a few seconds for politeness. To do : implement politeness windows based on the host's crawl_delay specified robots.txt
                    open(output_directory + '/' + filename, 'w', encoding='utf-8').write(response.text) 
                    lock.acquire()
                    req_pages = req_pages -1
                    lock.release()

                    soup = BeautifulSoup(response.text, 'html.parser')  
                    for link in soup.find_all('a'):

                        cleaned_link = clean_link(hyperlink = link.get('href'), website = url) 
                        if cleaned_link: 
                            frontier.put(cleaned_link)


        except:
            f = open("report.txt", "a")
            f.write("crawler failed on url: " + url + "\n")
            f.close()

        end_time0 = time.perf_counter()
        runtimes.append(end_time0-start_time0)
        # refresh pages, robots.txt, politeness window - to do

#create url queue (frontier) parse argv to pass into crawler 
frontier = seeds_to_frontier(seeds_filename = sys.argv[1])
req_pages = int(sys.argv[2])
req_hops = int(sys.argv[3])
output_directory = sys.argv[4]

start_time1 = time.perf_counter()

#crawl
crawler(frontier, req_pages, req_hops, output_directory)

end_time1 = time.perf_counter()

f = open("report.txt", "a")
f.write("Total Scrape Time: " +  str(end_time1-start_time1) + "\n")
f.write("Average Scrape Time for Each Url: " + str(statistics.mean(runtimes)) + "\n")  
f.close()

print("Total Scrape Time: " +  str(end_time1-start_time1) + "\n")

plt.plot(runtimes)
plt.ylabel("execution time (s)")
plt.xlabel("url queue iteration")
plt.title("scrapetime for each url")
plt.savefig("execution_time_of_scraped_urls")

1) Я не уверен, что это ускоряется. 2) Когда я пытаюсь указать остановку после очистки 10 страниц, скребок продолжает работать, пока я не выхожу из него принудительно. В моей первой версии (ячейка 1) этого не было.

Что не так с моей первой реализацией и второй реализацией? Первый вообще не кажется многопоточным из-за скорости очистки. Второй (я еще не уверен в скорости очистки) не останавливается на параметре req_page. Где я ошибаюсь в своей попытке многопоточности? Моя память повреждена?

...