Мы используем сельдерей для получения данных о рейсах от различных туристических агентств, каждый запрос занимает ~ 20-30 секунд (большинству агентств требуется последовательность запросов - авторизация, отправка запроса, опрос результатов).
Обычная задача сельдерея выглядит следующим образом:
from eventlet.green import urllib2, time
def get_results(attr, **kwargs):
search, provider, minprice = attr
data = XXX # prepared data
host = urljoin(MAIN_URL, "RPCService/Flights_SearchStart")
req = urllib2.Request(host, data, {'Content-Type': 'text/xml'})
try:
response_stream = urllib2.urlopen(req)
except urllib2.URLError as e:
return [search, None]
response = response_stream.read()
rsp_host = urljoin(MAIN_URL, "RPCService/FlightSearchResults_Get")
rsp_req = urllib2.Request(rsp_host, response, {'Content-Type':
'text/xml'})
ready = False
sleeptime = 1
rsp_response = ''
while not ready:
time.sleep(sleeptime)
try:
rsp_response_stream = urllib2.urlopen(rsp_req)
except urllib2.URLError as e:
log.error('go2see: results fetch failed for %s IOError %s'%
(search.id, str(e)))
else:
rsp_response = rsp_response_stream.read()
try:
rsp = parseString(rsp_response)
except ExpatError as e:
return [search, None]
else:
ready = rsp.getElementsByTagName('SearchResultEx')
[0].getElementsByTagName('IsReady')[0].firstChild.data
ready = (ready == 'true')
sleeptime += 1
if sleeptime > 10:
return [search, None]
hash = "%032x" % random.getrandbits(128)
open(RESULT_TMP_FOLDER+hash, 'w+').write(rsp_response)
# call to parser
parse_agent_results.apply_async(queue='parsers', args=[__name__,
search, provider, hash])
Эти задачи запускаются в пуле событий с конкурентностью 300, prefetch_multiplier = 1
, broker_limit = 300
Когда задача ~ 100-200 извлекается из очереди- загрузка ЦП возрастает до 100% (используется все ядро ЦП), а выборка задач из очереди выполняется с задержками.
Не могли бы вы указать на возможные проблемы - операции блокировки (eventlet ALARM DETECTOR
не дает исключений), неправильная архитектура или что-то еще.