celery + eventlet = 100% загрузка процессора - PullRequest
0 голосов
/ 14 марта 2012

Мы используем сельдерей для получения данных о рейсах от различных туристических агентств, каждый запрос занимает ~ 20-30 секунд (большинству агентств требуется последовательность запросов - авторизация, отправка запроса, опрос результатов).

Обычная задача сельдерея выглядит следующим образом:

from eventlet.green import urllib2, time 
def get_results(attr, **kwargs): 
    search, provider, minprice = attr 
    data = XXX # prepared data 
    host = urljoin(MAIN_URL, "RPCService/Flights_SearchStart") 
    req = urllib2.Request(host, data, {'Content-Type': 'text/xml'}) 
    try: 
        response_stream = urllib2.urlopen(req) 
    except urllib2.URLError as e: 
        return [search, None] 
    response = response_stream.read() 
    rsp_host = urljoin(MAIN_URL, "RPCService/FlightSearchResults_Get") 
    rsp_req = urllib2.Request(rsp_host, response, {'Content-Type': 
'text/xml'}) 
    ready = False 
    sleeptime = 1 
    rsp_response = '' 
    while not ready: 
        time.sleep(sleeptime) 
        try: 
            rsp_response_stream = urllib2.urlopen(rsp_req) 
        except urllib2.URLError as e: 
            log.error('go2see: results fetch failed for %s IOError %s'% 
(search.id, str(e))) 
        else: 
            rsp_response = rsp_response_stream.read() 
            try: 
                rsp = parseString(rsp_response) 
            except ExpatError as e: 
                return [search, None] 
            else: 
                ready = rsp.getElementsByTagName('SearchResultEx') 
[0].getElementsByTagName('IsReady')[0].firstChild.data 
                ready = (ready == 'true') 
        sleeptime += 1 
        if sleeptime > 10: 
            return [search, None] 
    hash = "%032x" % random.getrandbits(128) 
    open(RESULT_TMP_FOLDER+hash, 'w+').write(rsp_response) 
   # call to parser 
    parse_agent_results.apply_async(queue='parsers', args=[__name__, 
search, provider, hash]) 

Эти задачи запускаются в пуле событий с конкурентностью 300, prefetch_multiplier = 1, broker_limit = 300 Когда задача ~ 100-200 извлекается из очереди- загрузка ЦП возрастает до 100% (используется все ядро ​​ЦП), а выборка задач из очереди выполняется с задержками.

Не могли бы вы указать на возможные проблемы - операции блокировки (eventlet ALARM DETECTOR не дает исключений), неправильная архитектура или что-то еще.

Ответы [ 2 ]

0 голосов
/ 10 января 2013

Извините за поздний ответ.

Первое, что я хотел бы попробовать в такой ситуации, - это полностью отключить Eventlet как в Celery, так и в вашем коде, использовать модель процесса или потока ОС. 300 потоков или даже процессов - не такая большая нагрузка для планировщика ОС (хотя вам может не хватить памяти для запуска многих процессов). Так что я бы попробовал и посмотрел, резко ли падает загрузка процессора. Если это не так, то проблема в вашем коде, и Eventlet не может волшебным образом ее исправить. Однако, если он все-таки упадет, нам нужно изучить проблему ближе.

Если ошибка сохраняется, сообщите об этом одним из следующих способов:

0 голосов
/ 14 марта 2012

Проблема возникает, если вы запускаете 200 запросов к серверу, ответы могут быть отложены, и поэтому urllib.urlopen будет зависать.

Еще одна вещь, которую я заметил: если возникает URLError, программа остается в цикле while до тех пор, пока время сна не станет больше 10. Таким образом, ошибка URLError позволит этому сценарию бездействовать в течение 55 секунд (1 + 2 + 3 и т. Д.)

...