Python - несколько одновременных потоков - PullRequest
5 голосов
/ 18 мая 2011

Я пишу веб-скребок на python, используя httplib2 и lxml (да - я знаю, что мог бы использовать scrapy. Давайте пройдем мимо этого ...) У скребка есть около 15000 страниц для разбора примерно на 400 000 элементов.У меня есть код для синтаксического анализа элементов, которые запускаются мгновенно (почти), но часть, которая загружает страницу с сервера, все еще очень медленная.Я хотел бы преодолеть это через параллелизм.Тем не менее, я не могу полагаться на КАЖДУЮ страницу, которую нужно анализировать КАЖДЫЙ раз.Я пытался с одним ThreadPool (например, multiprocessing.pool, но сделал с потоками - что должно быть хорошо, так как это процесс, связанный с вводом / выводом), но я не мог придумать изящный (или работающий) способ полученияВСЕ потоки останавливаются, когда дата последнего элемента индекса была больше, чем элемент, который мы обрабатывали.Прямо сейчас я работаю над методом, использующим два экземпляра ThreadPool - один для загрузки каждой страницы, а другой для анализа страниц.Пример упрощенного кода:

#! /usr/bin/env python2

import httplib2
from Queue import PriorityQueue
from multiprocessing.pool import ThreadPool
from lxml.html import fromstring

pages = [x for x in range(1000)]
page_queue = PriorityQueue(1000)

url = "http://www.google.com"

def get_page(page):
    #Grabs google.com
    h = httplib2.Http(".cache")
    resp, content = h.request(url, "GET")
    tree = fromstring(str(content), base_url=url)
    page_queue.put((page, tree))
    print page_queue.qsize()

def parse_page():
    page_num, page = page_queue.get()
    print "Parsing page #" + str(page_num)
    #do more stuff with the page here
    page_queue.task_done()

if __name__ == "__main__":
    collect_pool = ThreadPool()
    collect_pool.map_async(get_page, pages)
    collect_pool.close()

    parse_pool = ThreadPool()
    parse_pool.apply_async(parse_page)
    parse_pool.close()


     parse_pool.join()
     collect_pool.join()
     page_queue.join()

Запуск этого кода, однако, не делает то, что я ожидаю - то есть запускать два пула потоков: один заполняет очередь, а другой вытягивает ее для анализа.Он запускает пул сбора и проходит через него, а затем запускает parse_pool и проходит через него (я полагаю, я не позволил коду работать достаточно долго, чтобы добраться до parse_pool - дело в том, что collect_pool - это все, что кажется работающим).Я вполне уверен, что что-то напутал с порядком вызовов join (), но я не могу на всю жизнь понять, в каком порядке они должны быть. Мой вопрос, по сути, такой:Я лаю здесь правильное дерево?и если да, то, что, черт возьми, я делаю не так?Если я не - что бы вы предложили

1 Ответ

7 голосов
/ 20 июня 2011

Прежде всего, ваш дизайн кажется правильным на высоком уровне.Использование пула потоков для сбора страниц оправдано синхронной природой модуля httlib2.(С асинхронной библиотекой одного потока было бы достаточно; обратите внимание, что даже с httplib2 и пулом в любое время из-за GIL работает не более одного потока сборщика.) Пул синтаксического анализа оправдывается модулем lxml, написанным на C /C ++ (и при условии, что таким образом Global Interpreter Lock освобождается во время синтаксического анализа страницы - это нужно проверить в документации или коде lxml!).Если бы это последнее не было правдой, то при наличии выделенного пула синтаксического анализа не было бы никакого выигрыша в производительности, поскольку только один поток мог бы получить GIL.В этом случае было бы лучше использовать пул процессов.

Я не знаком с реализацией ThreadPool, но я предполагаю, что она аналогична классу Pool в многопроцессорном модуле.Исходя из этого, проблема заключается в том, что вы создаете только один рабочий элемент для parse_pool и после того, как parse_page обрабатывает первую страницу, он никогда не пытается удалить из нее другие страницы.Дополнительные рабочие элементы также не передаются в этот пул, поэтому обработка останавливается, и после вызова parse_pool.close () потоки (пустого) пула завершаются.

Решение состоит в том, чтобы исключить page_queue.Функция get_page () должна поместить рабочий элемент в parse_pool, вызывая apply_async () для каждой собираемой страницы, вместо того, чтобы вводить их в page_queue.

Основной поток должен ждать, пока collect_queue не станет пустым (т.е.Вызов функции collect_pool.join () возвращен), затем он должен закрыть parse_pool (поскольку мы можем быть уверены, что для парсера больше не будет выполнено никаких работ).Затем следует дождаться, пока parse_pool станет пустым, вызвав parse_pool.join (), а затем завершить работу.

Кроме того, вам нужно увеличить количество потоков в connect_pool для одновременной обработки большего количества http-запросов.Количество потоков в пуле по умолчанию - это количество процессоров;в настоящее время вы не можете выдавать больше, чем столько запросов.Вы можете экспериментировать со значениями до тысяч или десятков тысяч;наблюдать за загрузкой процессора в пуле;он не должен приближаться к 1 процессору.

...