Tornado AsyncHTTPClient снижение производительности - PullRequest
0 голосов
/ 26 марта 2019

Настройка: Python 2.7.15, Tornado 5.1

У меня есть веб-сервер, который обрабатывает ~ 40 /recommend запросов в секунду. Среднее время ответа составляет 25 мс, но существует большое расхождение (некоторые запросы могут занимать более 500 мс).

Каждый запрос внутренне генерирует от 1 до 8 запросов Elasticsearch (HTTP-запросов). Каждый запрос Elasticsearch может занять от 1 до 150 мс.

Запросы Elasticsearch обрабатываются синхронно с помощью библиотеки asticsearch-dsl .

Цель состоит в том, чтобы сократить время ожидания ввода / вывода (запросы к Elasticsearch) и обрабатывать больше запросов в секунду, чтобы я мог уменьшить количество машин. Одно недопустимо - я не хочу увеличивать среднее время обработки (25 мс).

Я нашел некоторые реализации торнадо-эластичного поиска в сети, но, поскольку мне нужно использовать только одну конечную точку для Elasticsearch (/_search), я пытаюсь сделать это в одиночку.

Ниже представлена ​​вырожденная реализация моего веб-сервера. При той же нагрузке (~ 40 запросов в секунду) среднее время ответа на запрос увеличилось до 200 мс !

Копаясь, я вижу, что внутреннее время асинхронной обработки (запросы к Elasticsearch) нестабильно, и время, необходимое для каждого вызова fetch, может быть различным, а общее среднее значение (в ab нагрузочном тесте) высоко .

Я использую ab для моделирования нагрузки и ее внутреннего измерения, печатая текущее время обработки fetch, среднее время обработки fetch и максимальное время обработки. При выполнении одного запроса за раз (параллелизм 1): ab -p es-query-rcom.txt -T application/json -n 1000 -c 1 -k 'http://localhost:5002/recommend'

мои отпечатки выглядят так: [avg req_time: 3, dur: 3] [current req_time: 2, dur: 3] [max req_time: 125, dur: 125] reqs: 8000

Но когда я пытаюсь увеличить параллелизм (до 8): ab -p es-query-rcom.txt -T application/json -n 1000 -c 8 -k 'http://localhost:5002/recommend'

теперь мои отпечатки выглядят так: [avg req_time: 6, dur: 13] [current req_time: 4, dur: 4] [max req_time: 73, dur: 84] reqs: 8000

Среднее требование теперь x2 медленнее (или x4 по моим измерениям)! Что мне здесь не хватает? почему я вижу эту деградацию?

web_server.py:

import tornado
from tornado.httpclient import AsyncHTTPClient
from tornado.options import define, options
from tornado.httpserver import HTTPServer
from web_handler import WebHandler

SERVICE_NAME = 'web_server'
NUM_OF_PROCESSES = 1


class Statistics(object):
    def __init__(self):
        self.total_requests = 0
        self.total_requests_time = 0
        self.total_duration = 0
        self.max_time = 0
        self.max_duration = 0


class RcomService(object):
    def __init__(self):
        print 'initializing RcomService...'
        AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=3)
        self.stats = Statistics()

    def start(self, port):
        define("port", default=port, type=int)
        db = self.get_db(self.stats)
        routes = self.generate_routes(db)
        app = tornado.web.Application(routes)
        http_server = HTTPServer(app, xheaders=True)
        http_server.bind(options.port)
        http_server.start(NUM_OF_PROCESSES)
        tornado.ioloop.IOLoop.current().start()

    @staticmethod
    def generate_routes(db):
        return [
            (r"/recommend", WebHandler, dict(db=db))
        ]

    @staticmethod
    def get_db(stats):
        return {
            'stats': stats
        }


def main():
    port = 5002
    print('starting %s on port %s', SERVICE_NAME, port)

    rcom_service = RcomService()
    rcom_service.start(port)


if __name__ == '__main__':
    main()

web_handler.py:

import time
import ujson
from tornado import gen
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler


class WebHandler(RequestHandler):
    def initialize(self, db):
        self.stats = db['stats']

    @coroutine
    def post(self, *args, **kwargs):
        result = yield self.wrapper_innear_loop([{}, {}, {}, {}, {}, {}, {}, {}])  # dummy queries (empty)
        self.write({
            'res': result
        })

    @coroutine
    def wrapper_innear_loop(self, queries):
        result = []
        for q in queries:  # queries are performed serially 
            res = yield self.async_fetch_gen(q)
            result.append(res)
        raise gen.Return(result)

    @coroutine
    def async_fetch_gen(self, query):
        url = 'http://localhost:9200/my_index/_search'

        headers = {
                'Content-Type': 'application/json',
                'Connection': 'keep-alive'
        }

        http_client = AsyncHTTPClient()
        start_time = int(round(time.time() * 1000))
        response = yield http_client.fetch(url, method='POST', body=ujson.dumps(query), headers=headers)
        end_time = int(round(time.time() * 1000))
        duration = end_time - start_time
        body = ujson.loads(response.body)
        request_time = int(round(response.request_time * 1000))
        self.stats.total_requests += 1
        self.stats.total_requests_time += request_time
        self.stats.total_duration += duration
        if self.stats.max_time < request_time:
            self.stats.max_time = request_time
        if self.stats.max_duration < duration:
            self.stats.max_duration = duration
        duration_avg = self.stats.total_duration / self.stats.total_requests
        time_avg = self.stats.total_requests_time / self.stats.total_requests
        print "[avg req_time: " + str(time_avg) + ", dur: " + str(duration_avg) + \
              "] [current req_time: " + str(request_time) + ", dur: " + str(duration) + "] [max req_time: " + \
              str(self.stats.max_time) + ", dur: " + str(self.stats.max_duration) + "] reqs: " + \
              str(self.stats.total_requests)
        raise gen.Return(body)

Я попытался немного поиграть с асинхронным классом (Simple против curl), размером max_clients, но я не понимаю, какая мелодия лучше всего подходит для моего случая. Но

1 Ответ

0 голосов
/ 27 марта 2019

Увеличение времени может быть вызвано тем, что при параллельности == 1 ЦП был недостаточно загружен, а при c == 8 он используется на 100% + и не может выполнить все запросы. Например, абстрактный ЦП может обрабатывать 1000 операций в секунду, для отправки запроса требуется 50 операций ЦП, а для считывания результата запроса требуется также 50 операций ЦП. Когда у вас есть 5 RPS, ваш процессор используется на 50%, а среднее время запроса составляет 50 мс (для отправки запроса) + время запроса + 50 мс (для чтения запроса). Но когда у вас есть, например, 40 RPS (в 8 раз больше, чем 5 RPS), ваш ЦП будет перегружен на 400%, и некоторые завершенные запросы будут ожидать анализа, поэтому среднее время запроса теперь составляет 50 мс + запрос время + время ожидания процессора + 50 мс.

Подводя итог, я бы посоветовал проверить загрузку ЦП при обеих нагрузках и, чтобы быть уверенным, в профиле, сколько времени требуется для отправки запроса и разбора ответа, ЦП может быть вашим узким местом.

...