Различия в пропускной способности при использовании сопрограмм против потоков - PullRequest
5 голосов
/ 12 февраля 2012

Несколько дней назад я задал вопрос на SO о помощи в разработке парадигмы для структурирования нескольких HTTP-запросов

Вот сценарий. Я хотел бы иметь систему с несколькими производителями, с несколькими потребителями. Мои производители сканируют и очищают несколько сайтов и добавляют найденные ссылки в очередь. Поскольку я буду сканировать несколько сайтов, я хотел бы иметь несколько производителей / сканеров.

Потребители / работники получают данные из этой очереди, отправляют запросы TCP / UDP на эти ссылки и сохраняют результаты в моей базе данных Django. Я также хотел бы иметь несколько рабочих, так как каждый элемент очереди полностью независим друг от друга.

Люди предложили использовать библиотеку сопрограмм для этого, т. Е. Gevent или Eventlet. Никогда не работая с сопрограммами, я прочитал, что, хотя парадигма программирования похожа на многопоточные парадигмы, только один поток активно выполняется, но когда происходят блокирующие вызовы, такие как вызовы ввода-вывода, стеки переключаются в памяти, а другие зеленые поток вступает во владение, пока не встретит какой-то блокирующий вызов ввода / вывода. Надеюсь, я понял это правильно? Вот код из одного из моих SO сообщений:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

Это хорошо работает, потому что sleep вызовы блокируют вызовы, и когда происходит событие sleep, другой зеленый поток вступает во владение. Это намного быстрее, чем последовательное выполнение. Как вы можете видеть, в моей программе нет кода, который бы преднамеренно передавал выполнение одного потока другому потоку. Я не вижу, как это вписывается в приведенный выше сценарий, так как я хотел бы, чтобы все потоки выполнялись одновременно.

Все работает нормально, но я чувствую, что пропускная способность, которую я достиг с помощью Gevent / Eventlets, выше, чем у исходной последовательно запускаемой программы, но значительно ниже, чем то, что можно было бы достичь с помощью реальной многопоточности.

Если бы я заново реализовал свою программу, используя механизмы потоков, каждый из моих производителей и потребителей мог бы одновременно работать без необходимости обмениваться стеками, как сопрограммы.

Должно ли это быть повторно реализовано с использованием потоков? Мой дизайн не так? Мне не удалось увидеть реальные преимущества использования сопрограмм.

Может быть, мои понятия немного грязные, но это то, что я усвоил. Любая помощь или разъяснение моей парадигмы и понятий была бы отличной.

Спасибо

Ответы [ 3 ]

5 голосов
/ 12 февраля 2012

Как видите, в моей программе нет такого кода, который нарочно дает выполнение одного потока другому потоку. Я не вижу как это вписывается в сценарий выше, как я хотел бы иметь все потоки, выполняющиеся одновременно.

Существует один поток ОС, но несколько гринлетов. В вашем случае gevent.sleep() позволяет работникам выполнять одновременно. Блокировка вызовов IO, таких как urllib2.urlopen(url).read(), делает то же самое, если вы используете urllib2, исправленный для работы с gevent (вызывая gevent.monkey.patch_*()).

См. Также Любопытный курс по сопрограммам и параллелизму , чтобы понять, как код может работать одновременно в однопоточной среде.

Для сравнения различий в пропускной способности между Gevent, Threading и Multiprocessing вы можете написать код, совместимый со всеми подходами:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

Остальная часть сценария одинакова для всех реализаций параллелизма:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

Не выполнять код на уровне модуля, вставить его в main():

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
1 голос
/ 12 февраля 2012

gevent отлично подходит, когда у вас очень много (зеленых) потоков.Я проверил это с тысячами, и это работало очень хорошо.Вы должны убедиться, что все библиотеки, которые вы используете как для очистки, так и для сохранения в БД, становятся зелеными.afaik, если они используют сокет Python, инъекция Gevent должна работать.однако расширения, написанные на C (например, mysqldb), могут блокироваться, и вместо этого вам нужно будет использовать зеленые эквиваленты.

Если вы используете gevent, вы в основном можете покончить с очередями, порождая новый (зеленый) поток для каждой задачи,код для потока так же просто, как db.save(web.get(address)).gevent позаботится о вытеснении, когда какая-то библиотека в БД или веб-блоках.это будет работать до тех пор, пока ваши задачи помещаются в память.

0 голосов
/ 12 февраля 2012

В этом случае ваша проблема не в скорости программы (т. Е. Выбор gevent или многопоточности), а в пропускной способности сетевого ввода-вывода. Это (должно быть) узкое место, которое определяет скорость выполнения программы.

Gevent - это хороший способ убедиться, что является узким местом, а не архитектурой вашей программы.

Это тот процесс, который вам нужен:

import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all


patch_all()  # Patch urllib2, etc


def worker(work_queue, output_queue):
    for work_unit in work_queue:
        finished = do_work(work_unit)
        output_queue.put(finished)
        work_queue.task_done()


def producer(input_queue, work_queue):
    for url in input_queue:
        url_list = crawl(url)
        for work in url_list:
            work_queue.put(work)
        input_queue.task_done()


def do_work(work):
    gevent.sleep(0)  # Actually proces link here
    return work


def crawl(url):
    gevent.sleep(0)
    return list(url)  # Actually process url here

input = JoinableQueue()
work = JoinableQueue()
output = Queue()

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]


list_of_urls = ['foo', 'bar']

for url in list_of_urls:
    input.put(url)

# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'

# We now have output!
print 'output:'
for message in output:
    print message
# Or if you'd like, you could use the output as it comes!

Вам не нужно ждать окончания ввода и рабочих очередей, я только что продемонстрировал это здесь.

...