Паутина торнадо с mysql и редисом - PullRequest
0 голосов
/ 26 апреля 2018

У меня есть сервер торнадо, который будет создавать веб-сокет-соединение с клиентами. Всякий раз, когда клиент запрашивает какие-либо данные, мне нужно получить их из Redis или MySQL-DB. В дополнение к этому мне нужно прослушивать трансляцию с сервера торнадо, получать сетевые пакеты и отправлять их клиентам, если они подписаны на данные. Эта отправка широковещательных пакетов клиенту зависит от токена, который будет находиться в пакете. Если клиенты подписаны на токен, мы должны отправить ему пакет.

Запросить запрос:

  1. 5000 активных веб-сокетов (которые могут увеличиться)
  2. 1-БД запросов на сокет-соединение в секунду, всего 5000 БД-запросов в секунду
  3. 1-Redis-запрос на сокет-соединение в секунду, всего 5000 Redis-запросов / секунду.
  4. В эфире я должен прослушивать 1000 пакетов в секунду и проверять, подписан ли кто-либо из пользователей на токен.

Я знаю, что redis однопоточный и работает асинхронно (поправьте меня, если я ошибаюсь). Для MySQL-DB асинхронный драйвер, который я использую, это `tormysql` (большинство моих вызовов к БД - это запросы на выборку, а не сложные операции с БД.).

Мой вопрос:

  1. Будет ли блокировать вызов MySQL-DB? Если они блокируют вызов, могу ли я запустить другой поток / процесс только для запросов к БД?
  2. Есть ли вероятность, что торнадо отбросит пакеты в эфир?
  3. У меня может быть балансировщик нагрузки перед моими ресурсами и сервер их, но возможно ли, что я могу использовать один ЦП с 2-ядерным 8-ГБ ОЗУ?

Обновление 1 :
Я написал код для MySQL:
Server.py

import logging
import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
import os.path
import uuid
import sys
from tornado import gen

from tornado.options import define, options

import tormysql ## MySQL async driver
pool = tormysql.ConnectionPool(
    max_connections = 20, #max open connections
    idle_seconds = 7200, #conntion idle timeout time, 0 is not timeout
    wait_connection_timeout = 3, #wait connection timeout
    host = "127.0.0.1",
    user = "test",
    passwd = "",
    db = "testdb",
    charset = "utf8"
)


define("port", default=8000, help="run on the given port", type=int)

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", MainHandler),
            (r"/dataSock", ChatSocketHandler),
        ]
        settings = dict(
            cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
            template_path=os.path.join(os.path.dirname(__file__), "templates"),
            static_path=os.path.join(os.path.dirname(__file__), "static"),
            xsrf_cookies=True,
        )
        super(Application, self).__init__(handlers, **settings)

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("indexWS.html")

class ChatSocketHandler(tornado.websocket.WebSocketHandler):
    openConnCount = 0
    def get_compression_options(self):
        # Non-None enables compression with default options.
        return {}

    def open(self):
        # print("Socket open:%s"%(self))
        ChatSocketHandler.openConnCount += 1
        None
    def on_close(self):
        # print("Socket closed:%s"%(self))
        ChatSocketHandler.openConnCount -= 1
        None

    async def on_message(self, message):
        logging.info("open conn count %r", ChatSocketHandler.openConnCount)
        returnDB = await getDataFromDB()
        self.write_message("server got this from you:%s"%(str(returnDB)))

@gen.coroutine
def getDataFromDB():
    with (yield pool.Connection()) as conn:
        try:
            with conn.cursor() as cursor:
                yield cursor.execute("SHOW TABLES;")
                # print(cursor.fetchall())
        except:
            yield conn.rollback()
        else:
            yield conn.commit()

        with conn.cursor() as cursor:
            yield cursor.execute("SELECT * FROM theme;")
            datas = cursor.fetchall()

    return datas

    # yield pool.close()
def main():
    tornado.options.parse_command_line()
    app = Application()
    # print "options:", options
    # sys.exit()
    app.listen(options.port)
    print("PORT:%s"%(options.port))
    tornado.ioloop.IOLoop.current().start()

if __name__ == "__main__":
    main()

Теперь, когда я проверяю этот код следующим образом:
Client.py

import asyncio
import websockets

async def hello(i):
    async with websockets.connect('ws://localhost:8000/dataSock') as websocket:
        name = 'A'#input("What's your name? ")
        print("******************************%s******************************"%(i))
        for j in range(100):
            await websocket.send(name)
            # print("> {}".format(name))

            greeting = await websocket.recv()
            print("{}: {}".format(i, len(greeting)))
            asyncio.sleep(10)

async def start():
    for i in range(10):
        await hello(i)
    print("end")
    asyncio.sleep(20)
asyncio.get_event_loop().run_until_complete(start())
# asyncio.get_event_loop().run_forever()

Если я запускаю один экземпляр кода, все работает хорошо. Когда я увеличиваю количество клиентов до 70 (70 экземпляров клиента), возникает задержка в ответе, который я получаю.

2-й вопрос объяснения:
Сервер Торнадо должен прослушивать какой-либо порт, в котором я буду получать сетевые пакеты, тот, который я должен отправлять клиентам, если они подписаны. Так будет ли вероятность, что эти пакеты будут отброшены?

1 Ответ

0 голосов
/ 27 апреля 2018
  1. Будет ли блокировать вызов MySQL-DB? Если они блокируют вызов, могу ли я запустить другой поток / процесс только для запросов к БД?

Как вы упомянули, вы используете tormysql в качестве драйвера, поэтому нет, вызовы не будут блокироваться, поскольку tormysql является асинхронным.

  1. Есть ли вероятность, что торнадо отбросит пакеты в эфир?

Я не совсем понимаю, что вы имеете в виду под этим. Но так как протокол websocket построен на TCP, то доставка всех пакетов гарантирована. TCP позаботится об этом.

  1. У меня может быть балансировщик нагрузки перед моими ресурсами и сервер их, но возможно ли, что я могу использовать один ЦП с 2-ядерным 8-ГБ ОЗУ?

Да.


Я думаю, что вы слишком обдумали свое заявление на данный момент. Преждевременная оптимизация - это зло. Вы еще не написали код, и вы думаете о производительности. Это просто тратить ваше время.

Сначала напишите код, а затем проведите стресс-тестирование, чтобы увидеть, какую нагрузку может обработать ваше приложение. Затем выполните профилирование, чтобы увидеть, где все замедляется. Затем вы оптимизируете код или измените настройки и, возможно, обновите свое оборудование.

Но просто думать о производительности без написания и стресс-тестирования кода - просто трата времени.

...