Я создаю легкий прокси-сервер с использованием Tronado, который записывает запрос + ответ в очередь Kafka. Мой клиентский запрос -> Прокси-сервер Tornado (я разрабатываю) -> API-интерфейс TensorFlow, возвращающий время отклика клиента, не может превышать 20 миллисекунд.
Tornado версии 6.0.4 Kafka- python
При использовании только прокси-сервера Tornado без регистрации в Kafka время отклика составляет менее 14 мс, при ведении журнала Kafka время отклика достигает 20000 мс, что неприемлемо. Как уменьшить время ответа с помощью Kafka publi sh? Я новичок ie с фреймворком Tornado.
Любая помощь с этим приветствуется.
Я видел этот пост здесь и кажется, что библиотека Kiel python иметь обновления, сделанные 4 года назад, хорошо ли это использовать? ИЛИ есть ли у нас лучшие способы добиться этого?
Код основного сервера:
from tornado.web import Application, RequestHandler
from tornado.ioloop import IOLoop
from model_proxy_server.tfs_request import request
from tornado import gen
from model_proxy_server.model_request_response_publisher import publish_message
import json
class PostToTfs(RequestHandler):
@gen.coroutine
def post (self, *args, **kwargs):
kafka_msg = {}
model_url = 'http://localhost:8501' + self.request.path
response = yield request(self.request.body, model_url)
resp = json.loads(response)
yield self.write(resp)
kafka_msg['request_url'] = model_url
kafka_msg['request'] = json.loads(self.request.body)
kafka_msg['response'] = resp
msg = json.dumps(kafka_msg)
# Publish the request and response to Kafka topic
yield publish_message(msg, 'dev-ml-model-logs')
def make_app():
# TODO Get to know on how to host this in PROD with common URL for this APP
urls = [(r"/.*", PostToTfs)]
return Application(urls, debug=True)
def main():
app = make_app()
app.listen(3000)
IOLoop.instance().start()
if __name__ == '__main__':
main()
Обработчик запросов:
import tornado.httpclient
from tornado.ioloop import IOLoop
from tornado import gen
import tornado.options
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@gen.coroutine
def json_fetch(http_client, body, model_url):
try:
response = yield http_client.fetch(f"{model_url}", method='POST', body=body)
except Exception as e:
logging.info("Exception on http_client fetch from url")
logging.error("Error: " + str(e))
raise gen.Return(response)
@gen.coroutine
def request(body, model_url):
http_client = tornado.httpclient.AsyncHTTPClient()
http_response = yield json_fetch(http_client, body, model_url)
return http_response.body
if __name__ == "__main__":
tornado.options.parse_command_line()
IOLoop.instance().run_sync(request)
Издатели Kafka:
from kafka import KafkaProducer
from tornado import gen
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@gen.coroutine
def publish_message(msg, topic_name):
producer_instance = _kafka_connect()
msg_bytes = bytes(msg, encoding='utf-8')
try:
yield producer_instance.send(f'{topic_name}', msg_bytes)
logging.info("Message published successfully")
except Exception as e:
logging.info("Exception in publishing message")
logging.error(str(e))
@gen.coroutine
def _kafka_connect():
try:
producer = yield KafkaProducer(bootstrap_servers='ec2prdkafka01:9093', api_version=(0, 10))
except Exception as e:
logging.info("Exception while connecting to Kafka")
logging.error(str(e))
return producer