Прокси-сервер Tornado API и логирование ответа в Kafka - PullRequest
0 голосов
/ 26 мая 2020

Я создаю легкий прокси-сервер с использованием Tronado, который записывает запрос + ответ в очередь Kafka. Мой клиентский запрос -> Прокси-сервер Tornado (я разрабатываю) -> API-интерфейс TensorFlow, возвращающий время отклика клиента, не может превышать 20 миллисекунд.

Tornado версии 6.0.4 Kafka- python

При использовании только прокси-сервера Tornado без регистрации в Kafka время отклика составляет менее 14 мс, при ведении журнала Kafka время отклика достигает 20000 мс, что неприемлемо. Как уменьшить время ответа с помощью Kafka publi sh? Я новичок ie с фреймворком Tornado.

Любая помощь с этим приветствуется.

Я видел этот пост здесь и кажется, что библиотека Kiel python иметь обновления, сделанные 4 года назад, хорошо ли это использовать? ИЛИ есть ли у нас лучшие способы добиться этого?

Код основного сервера:

from tornado.web import Application, RequestHandler
from tornado.ioloop import IOLoop
from model_proxy_server.tfs_request import request
from tornado import gen
from model_proxy_server.model_request_response_publisher import publish_message
import json

class PostToTfs(RequestHandler):

@gen.coroutine
def post (self, *args, **kwargs):
    kafka_msg = {}

    model_url = 'http://localhost:8501' +  self.request.path

    response = yield request(self.request.body, model_url)

    resp = json.loads(response)
    yield self.write(resp)

    kafka_msg['request_url'] = model_url
    kafka_msg['request'] = json.loads(self.request.body)
    kafka_msg['response'] = resp
    msg = json.dumps(kafka_msg)

    # Publish the request and response to Kafka topic
    yield publish_message(msg, 'dev-ml-model-logs')


def make_app():
    # TODO Get to know on how to host this in PROD with common URL for this APP
    urls = [(r"/.*", PostToTfs)]
    return Application(urls, debug=True)

def main():
    app = make_app()
    app.listen(3000)
    IOLoop.instance().start()

if __name__ == '__main__':
    main()

Обработчик запросов:

import tornado.httpclient
from tornado.ioloop import IOLoop
from tornado import gen
import tornado.options
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@gen.coroutine
def json_fetch(http_client, body, model_url):
    try:
        response = yield http_client.fetch(f"{model_url}", method='POST', body=body)
    except Exception as e:

        logging.info("Exception on http_client fetch from url")
        logging.error("Error: " + str(e))

    raise gen.Return(response)

@gen.coroutine
def request(body, model_url):

    http_client = tornado.httpclient.AsyncHTTPClient()
    http_response = yield json_fetch(http_client, body, model_url)
    return http_response.body

if __name__ == "__main__":
    tornado.options.parse_command_line()
    IOLoop.instance().run_sync(request)

Издатели Kafka:

from kafka import KafkaProducer
from tornado import gen
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@gen.coroutine
def publish_message(msg, topic_name):
    producer_instance = _kafka_connect()
    msg_bytes = bytes(msg, encoding='utf-8')

    try:
        yield producer_instance.send(f'{topic_name}', msg_bytes)
        logging.info("Message published successfully")
    except Exception as e:
        logging.info("Exception in publishing message")
        logging.error(str(e))

@gen.coroutine
def _kafka_connect():

    try:
         producer = yield KafkaProducer(bootstrap_servers='ec2prdkafka01:9093', api_version=(0, 10))

    except Exception as e:
        logging.info("Exception while connecting to Kafka")
        logging.error(str(e))

    return producer

1 Ответ

0 голосов
/ 16 июня 2020

Хорошо, вот как я решил проблему и увеличил время ответа с 20000 мс моего прокси-сервера до 20 мс.

Издатели Kafka создают соединение с Kafka каждый раз, когда вызывается этот модуль , что было узким местом во времени ответа.

Итак, я установил соединение с Kafka при запуске прокси-сервера, как показано ниже. так что его можно использовать напрямую

def post (self, *args, **kwargs):

def initilize(self, producer):
    self.producer = producer 

Надеюсь, это поможет кому-то, кто ищет такую ​​информацию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...