исключение торнадо AsyncHTTPClient.fetch - PullRequest
1 голос
/ 09 октября 2011

Я использую tornado.httpclient.AsyncHTTPClient.fetch для получения доменов из списка.Когда я помещаю домены для выборки с большим интервалом (например, 500), все работает хорошо, но когда я уменьшаю инерцию до 100, время от времени возникает следующее исключение:


Traceback (most recent call last):
  File "/home/crchemist/python-2.7.2/lib/python2.7/site-packages/tornado/simple_httpclient.py", line 289, in cleanup
    yield
  File "/home/crchemist/python-2.7.2/lib/python2.7/site-packages/tornado/stack_context.py", line 183, in wrapped
    callback(*args, **kwargs)
  File "/home/crchemist/python-2.7.2/lib/python2.7/site-packages/tornado/simple_httpclient.py", line 384, in _on_chunk_length
    self._on_chunk_data)
  File "/home/crchemist/python-2.7.2/lib/python2.7/site-packages/tornado/iostream.py", line 180, in read_bytes
    self._check_closed()
  File "/home/crchemist/python-2.7.2/lib/python2.7/site-packages/tornado/iostream.py", line 504, in _check_closed
    raise IOError("Stream is closed")
IOError: Stream is closed

Что может быть причинойэто поведение?Код выглядит так:


def fetch_domain(domain):
    http_client = AsyncHTTPClient()
    request = HTTPRequest('http://' + domain,
       user_agent=CRAWLER_USER_AGENT)
    http_client.fetch(request, handle_domain)


class DomainFetcher(object):
    def __init__(self, domains_iterator):
        self.domains = domains_iterator

    def __call__(self):
        try:
            domain = next(self.domains)
        except StopIteration:
            domain_generator.stop()
            ioloop.IOLoop.instance().stop()
        else:
            fetch_domain(domain)

domain_generator = ioloop.PeriodicCallback(DomainFetcher(domains), 500)
domain_generator.start()

Ответы [ 2 ]

2 голосов
/ 14 октября 2011

обратите внимание, что tornado.ioloop.PeriodicCallback занимает время цикла в целых мс , в то время как объект HTTPRequest настроен на connect_timeout и / или request_timeout с плавающей запятой секунд ( см. Документ ).

" Пользователи, просматривающие Интернет, считают, что ответы" мгновенные ", если задержка менее 100 мс от клика до ответа " ( из википедии ) См. этот вопрос об ошибке сервера для нормальная задержка значения .

IOError: Stream is closed действительно вызывается для информирования вас о том, что время соединения истекло без завершения, или, точнее, вы вызвали обратный вызов вручную для еще не открытого канала. Это хорошо, так как задержка не превышает 100 мс; если вы ожидаете, что ваши выборки будут завершены надежно, вы должны повысить это значение.

Как только вы установили тайм-аут на что-то вменяемое, рассмотрите возможность обернуть ваши выборки в цикл повторной попытки / исключения, поскольку это обычное исключение , которое вы можете ожидать при работе . Только будьте осторожны, чтобы установить предел повторных попыток!


Поскольку вы используете асинхронную среду, почему бы не позволить ей обрабатывать сам асинхронный обратный вызов вместо выполнения указанного обратного вызова через фиксированный интервал? Epoll / kqueue эффективны и поддерживаются этой платформой.

import ioloop

def handle_request(response):
    if response.error:
        print "Error:", response.error
    else:
        print response.body
    ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
http_client.fetch("http://www.google.com/", handle_request)
ioloop.IOLoop.instance().start()

^ Дословно скопировано из документа .

Если вы пойдете по этому пути, единственное, что нужно, - это закодировать вашу очередь запросов, чтобы обеспечить максимальное количество открытых соединений. В противном случае вы, скорее всего, столкнетесь с состоянием гонки при серьезной очистке.

Прошло ~ 1 год с тех пор, как я сам прикоснулся к Торнадо, поэтому, пожалуйста, дайте мне знать, есть ли неточности в этом ответе, и я пересмотрю.

1 голос
/ 31 августа 2014

Похоже, вы пишете что-то вроде веб-сканера. Ваша проблема вызвана тайм-аутом напрямую, но в глубине, связана с параллельным паттерном торнадо.

Конечно, AsyncHTTPClient в торнадо может автоматически ставить запросы в очередь. На самом деле, AsyncHTTPClient отправит 10 запросов (по умолчанию) в пакете, и заблокирует ожидание их результата, а затем отправит следующий пакет. Запросы внутри пакета не являются блочными и обрабатываются параллельно, но это блок между партиями. И обратный вызов для каждого запроса вызывается не сразу после того, как этот запрос был выполнен, а после того, как этот пакет запросов был выполнен, а затем вызвал 10 обратных вызовов.

Возвращаясь к вашей проблеме, вам не нужно использовать ioloop.PeriodicCallback для пошагового выполнения запросов, поскольку AsyncHTTPClient в торнадо может автоматически ставить запросы в очередь. Вы можете назначить все запросы за один раз, позвольте AsyncHTTPClient планировать запросы.

Но возникает проблема, связанная с тем, что запросы в очереди ожидания по-прежнему потребляют время ожидания ! Потому что запросы блокируются между пакетами . Более поздние запросы здесь просто блокируются и отправляют партию за партией, а не помещают их в специальную готовую очередь и отправляют новые запросы после получения ответа.

Поэтому время ожидания по умолчанию, установленное на 20 с, слишком короткое, если запланировано много запросов. Если вы просто делаете демонстрацию, вы можете напрямую установить время ожидания на float('inf'). Если вы делаете что-то серьезное, вы должны использовать попытку / исключение повторного цикла.

Вы можете найти, как установить тайм-аут из tornado/httpclient.py, цитата здесь.

connect_timeout: время ожидания для первоначального подключения в секундах
request_timeout: время ожидания для всего запроса в секундах

В конце я пишу простую программу, которая использует AsyncHTTPClient для извлечения тысяч страниц из ZJU Online Judgment System. Вы можете попробовать это, а затем переписать своему сканеру. В моей сети это может принести 2800 страниц за 2 минуты. Очень хорошие результаты, в 10 раз (точно соответствуют размеру параллели) быстрее, чем последовательная выборка.

#!/usr/bin/env python
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.ioloop import IOLoop

baseUrl = 'http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode='

start = 1001
end = 3800
count = end - start
done = 0

client = AsyncHTTPClient()

def onResponse(response):
    if response.error:
        print('Error: %s' % response.error)
    else:
        global done
        done += 1
        #It is comment out here, you could uncomment it and watch something interest, that len(client.queue) is reduce 10 by 10.
        #print('Queue length %s, Active client count %s, Max active clients limit %s' % (len(client.queue), len(client.active), client.max_clients))
        print('Received %s, Content length %s, Done %s' % (response.effective_url[-4:], len(response.body), done))
        if(done == count):
            IOLoop.instance().stop()

for i in range (start, end):
    request = HTTPRequest(baseUrl + str(i), connect_timeout=float('inf'), request_timeout=float('inf'))
    client.fetch(request, onResponse)
    print('Generated %s' % i)

IOLoop.instance().start()

Дополнительно:

Если у вас есть много страниц для загрузки, и вы из тех людей, которые стремятся к максимальной производительности, вы можете посмотреть Twisted. Я пишу ту же программу с Twisted и вставляю ее в мой Gist . Результат потрясающий: загрузите 2800 страниц за 40 секунд.

...