Несколько асинхронных HTTP-подключений к Tornado Server - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть сервер торнадо, который я пытаюсь сделать синхронным. У меня есть клиент, который делает асинхронные запросы к серверу одновременно. Он пингует сервер каждые 5 секунд с помощью пульса и, во-вторых, он делает GET-запрос на задание, когда может.

На стороне сервера есть потокобезопасная очередь, которая содержит задания. Блокируется на 20 секунд, если очередь пуста. Я хочу, чтобы он удерживал соединение и блокировал эти 20 секунд, а когда он возвращается, он пишет «Нет работы» клиенту. Как только работа становится доступной, она должна немедленно написать ее клиенту, так как queue.get () вернется. Я хочу, чтобы пульс продолжал происходить в фоновом режиме, пока этот запрос заблокирован. Здесь я делаю два асинхронных запроса к серверу от одного и того же клиента.

Вот пример проекта, который я создаю, который имитирует мою проблему.

Сервер:

import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen

q = Queue()


class HeartBeatHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        print("Heart beat")


class JobHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        print("Job")
        try:
            job = yield q.get(block=True, timeout=20)
            self.write(job)
        except Exception as e:
            self.write("No job")


def make_app():
    return tornado.web.Application([
        (r"/heartbeat", HeartBeatHandler),
        (r"/job", JobHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    try:
        tornado.ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        tornado.ioloop.IOLoop.current().stop()

Клиент:

import asyncio
from tornado import httpclient, gen


@gen.coroutine
def heartbeat_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient()
        heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
                                                   body="")
        try:
            yield http_client.fetch(heartbeat_request)
            yield asyncio.sleep(5)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


@gen.coroutine
def worker_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
        job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
        try:
            response = yield http_client.fetch(job_request)
            print(response.body)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(heartbeat_routine())
    asyncio.ensure_future(worker_routine())
    loop.run_forever()

Вопросы:

  1. Проблема в том, что пульс также блокируется на эти 20 секунд. в то время как queue.get () блокируется. Который я не хочу.
  2. Как вы можете видеть в моем клиенте, я установил время ожидания запроса на 180 секунд. Но это кажется, никогда не работает с торнадо. Если вы увеличиваете queue.get () тайм-аут выше 20 секунд, он возвращает код ошибки, сообщающий, что время запроса истекло.

1 Ответ

0 голосов
/ 09 ноября 2018
  1. Если вы используете потокобезопасную очередь, вы должны использовать не использовать операции блокировки из потока IOLoop. Вместо этого запустите их в пуле потоков:

    job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
    

    В качестве альтернативы, вы можете использовать асинхронную (но поточно-небезопасную) очередь Tornado и использовать IOLoop.add_callback всякий раз, когда вам нужно взаимодействовать с очередью из другого потока.

  2. В конструкторе AsyncHTTPClient есть какая-то магия, которая пытается по возможности делиться существующими экземплярами, но это означает, что аргументы конструктора эффективны только в первый раз. worker_routine выбирает экземпляры по умолчанию, созданные heartbeat_routine. Добавьте force_instance=True, чтобы убедиться, что вы получили новый клиент в worker_routine (и назовите .close(), когда закончите)

...