Какой самый быстрый способ отправить 100 000 HTTP-запросов в Python? - PullRequest
228 голосов
/ 13 апреля 2010

Я открываю файл с 100 000 URL. Мне нужно отправить HTTP-запрос на каждый URL и распечатать код состояния. Я использую Python 2.6, и до сих пор смотрел на многие запутанные способы, которыми Python реализует многопоточность / параллелизм. Я даже посмотрел на библиотеку python concurrence , но не могу понять, как правильно написать эту программу. Кто-нибудь сталкивался с подобной проблемой? Я думаю, что в общем мне нужно знать, как выполнить тысячи задач в Python как можно быстрее - я полагаю, это означает «одновременно».

Ответы [ 13 ]

175 голосов
/ 14 апреля 2010

Twistedless решение:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

Это немного быстрее, чем скрученное решение и использует меньше ресурсов процессора.

49 голосов
/ 28 августа 2014

Решение с использованием торнадо асинхронной сетевой библиотеки

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
34 голосов
/ 14 апреля 2010

Темы абсолютно не ответ здесь. Они обеспечат узкие места как для процесса, так и для ядра, а также допустимые пределы пропускной способности, если общая цель - «самый быстрый путь».

Немного twisted и его асинхронный HTTP клиент дадут вам гораздо лучшие результаты.

29 голосов
/ 10 сентября 2017

С 2010 года ситуация немного изменилась, когда она была опубликована, и я не пробовал все остальные ответы, но я пробовал несколько, и я нашел, что это работает лучше всего для меня, используя python3.6.

Мне удалось получить около 150 уникальных доменов в секунду, работающих на AWS.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
14 голосов
/ 14 июля 2014

Используйте grequests , это комбинация запросов + модуль Gevent.

GRequests позволяет использовать запросы с Gevent, чтобы легко выполнять асинхронные HTTP-запросы.

Простое использование:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Создать набор неотправленных запросов:

>>> rs = (grequests.get(u) for u in urls)

Отправьте их все одновременно:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
7 голосов
/ 14 апреля 2010

Если вы хотите добиться максимальной производительности, возможно, вы захотите использовать асинхронный ввод-вывод, а не потоки. Накладные расходы, связанные с тысячами потоков ОС, нетривиальны, и переключение контекста в интерпретаторе Python добавляет еще больше. Работа с потоками, безусловно, сделает работу, но я подозреваю, что асинхронный маршрут обеспечит лучшую общую производительность.

В частности, я бы предложил асинхронный веб-клиент в библиотеке Twisted (http://www.twistedmatrix.com).). Он имеет заведомо крутую кривую обучения, но его довольно просто использовать, если вы хорошо разбираетесь в стиле асинхронного программирования Twisted.

Руководство по асинхронному веб-клиенту Twisted доступно по адресу:

http://twistedmatrix.com/documents/current/web/howto/client.html

7 голосов
/ 14 апреля 2010

Хороший подход к решению этой проблемы состоит в том, чтобы сначала написать код, необходимый для получения одного результата, а затем включить многопоточный код для распараллеливания приложения.

В идеальном мире это просто означало бы одновременный запуск 100 000 потоков, которыевыводить их результаты в словарь или список для последующей обработки, но на практике вы ограничены в количестве параллельных HTTP-запросов, которые вы можете выполнить таким способом.Локально, у вас есть ограничения на количество сокетов, которые вы можете открыть одновременно, сколько потоков выполнения ваш интерпретатор Python позволит.Удаленно, вы можете быть ограничены в количестве одновременных подключений, если все запросы направлены к одному серверу или ко многим.Эти ограничения, вероятно, потребуют от вас написания сценария таким образом, чтобы опрашивать только небольшую часть URL-адресов одновременно (100, как упоминалось в другом постере, вероятно, является приличным размером пула потоков, хотя вы можете обнаружить, что выможет успешно развернуть многие другие).

Вы можете использовать этот шаблон проектирования для решения вышеуказанной проблемы:

  1. Запускать поток, который запускает новые потоки запроса, до количества запущенных в данный момент потоков (вы можете отслеживать их с помощью threading.active_count () или путем помещения объектов потока в структуру данных) is> = ваше максимальное количество одновременных запросов (скажем, 100), а затем спит в течение короткого времени ожидания.Этот поток должен завершиться, когда больше нет URL-адресов для обработки.Таким образом, поток будет продолжать просыпаться, запускать новые потоки и спать до тех пор, пока вы не закончите.
  2. Пусть потоки запроса сохранят свои результаты в некоторой структуре данных для последующего извлечения и вывода.Если структура, в которой вы сохраняете результаты, представляет собой list или dict в CPython, вы можете безопасно добавлять или вставлять уникальные элементы из ваших потоков без блокировок , но если вы записываете в файл или требуетепри более сложном взаимодействии данных между потоками вы должны использовать блокировку взаимного исключения для защиты этого состояния от повреждения .

Я бы посоветовал вам использовать threading модуль.Вы можете использовать его для запуска и отслеживания запущенных потоков.Поддержка многопоточности в Python отсутствует, но описание вашей проблемы предполагает, что ее вполне достаточно для ваших нужд.

Наконец, если вы хотите увидеть довольно простое приложение параллельного сетевого приложения, написанное на Python,проверить ssh.py .Это небольшая библиотека, которая использует потоки Python для распараллеливания многих SSH-соединений.Дизайн достаточно близок к вашим требованиям, и вы можете найти его хорошим ресурсом.

5 голосов
/ 14 апреля 2010

Решение:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
1 голос
/ 13 апреля 2010

Использование пула потоков - хороший вариант, который сделает это довольно просто. К сожалению, в python нет стандартной библиотеки, которая делает пулы потоков очень простыми. Но вот достойная библиотека, которая должна помочь вам начать: http://www.chrisarndt.de/projects/threadpool/

Пример кода с их сайта:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Надеюсь, это поможет.

0 голосов
/ 12 августа 2014

Этот скрученный асинхронный веб-клиент работает довольно быстро.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...