Работает ли multiprocessing.Queue с gevent? - PullRequest
4 голосов
/ 25 сентября 2011

Кто-нибудь знает, что не так с этим кодом? Он просто «грузит» вечно. Нет вывода. «Сайты» - это список из нескольких десятков строк.

num_worker_threads = 30

def mwRegisterWorker():
    while True:
        try:
            print q.get()
        finally:
            pass

q = multiprocessing.JoinableQueue()
for i in range(num_worker_threads):
     gevent.spawn(mwRegisterWorker)

for site in sites:
    q.put(site)

q.join()  # block until all tasks are done

Ответы [ 2 ]

11 голосов
/ 25 сентября 2011

gevent.spawn() создает гринлеты, а не процессы (более того: все гринлеты запускаются в одном потоке ОС). Так что multiprocessing.JoinableQueue здесь не подходит.

gevent основано на кооперативной многозадачности, т.е. до тех пор, пока вы не вызовете функцию блокировки, которая переключается на цикл обработки событий gevent, другие гринлеты не будут работать. Например, conn ниже использует исправления для методов сокета gevent, которые позволяют другим гринлетам запускаться, ожидая ответа от сайта. А без pool.join(), который отдает управление гринлету, который запускает цикл обработки событий, никаких соединений не будет.

Чтобы ограничить параллелизм при отправке запросов нескольким сайтам, вы можете использовать gevent.pool.Pool:

#!/usr/bin/env python
from gevent.pool import Pool
from gevent import monkey; monkey.patch_socket()
import httplib # now it can be used from multiple greenlets

import logging
info = logging.getLogger().info

def process(site):
    """Make HEAD request to the `site`."""
    conn = httplib.HTTPConnection(site)
    try:
        conn.request("HEAD", "/")
        res = conn.getresponse()
    except IOError, e:
        info("error %s reason: %s" % (site, e))
    else:
        info("%s %s %s" % (site, res.status, res.reason))
    finally:
        conn.close()

def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(msg)s")

    num_worker_threads = 2
    pool = Pool(num_worker_threads)    
    sites = ["google.com", "bing.com", "duckduckgo.com", "stackoverflow.com"]*3
    for site in sites:
        pool.apply_async(process, args=(site,))
    pool.join()

if __name__=="__main__":
   main()
3 голосов
/ 26 сентября 2011

Используйте взамен gevent.queue.JoinableQueue. Зеленые потоки (gevent использует их внутренне) не являются ни потоками, ни процессами, а сопрограммой с планированием на уровне пользователя.

...