Вызов асинхронной конечной точки блокируется другим потоком - PullRequest
2 голосов
/ 27 октября 2019

У меня есть веб-служба торнадо, которая будет обслуживать около 500 запросов в минуту. Все эти запросы будут направлены на одну конкретную конечную точку. Существует программа C++, которую я скомпилировал с использованием Cython и использую ее внутри службы торнадо в качестве процессора моего процессора. Каждый запрос к /check/ будет вызывать вызов функции в программе C++ (я назову это handler), и возвращаемое значение будет отправлено пользователю в качестве ответа.

Вот как яоберните handler класс. Важным моментом является то, что я не создаю экземпляр handler в __init__. В моем коде торнадо есть еще один маршрут, с которого я хочу начать загрузку DataStructure после того, как запрос будет обработан автоматически. (например, /reload/)


executors = ThreadPoolExecutor(max_workers=4)


class CheckerInstance(object):
    def __init__(self, *args, **kwargs):
        self.handler = None
        self.is_loading = False
        self.is_live = False

    def init(self):
        if not self.handler:
            self.handler = pDataStructureHandler()
            self.handler.add_words_from_file(self.data_file_name)
            self.end_loading()
            self.go_live()

    def renew(self):
        self.handler = None
        self.init()



class CheckHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument("q", None).encode('utf-8')
        answer = query

        if not checker_instance.is_live:
            self.write(dict(answer=self.get_argument("q", None), confidence=100))
            return

        checker_response = await checker_instance.get_response(query)
        answer = checker_response[0]
        confidence = checker_response[1]

        if self.request.connection.stream.closed():
            return
        self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))

    def on_connection_close(self):
        self.wait_future.cancel()


class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
    def prepare(self):
        self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')

    def new_file_exists(self):
        return True

    def can_reload(self):
        return not checker_instance.is_loading

    def get(self):
        error = False
        message = None

        if not self.can_reload():
            error = True
            message = 'another job is being processed!'
        else:
            if not self.new_file_exists():
                    error = True
                    message = 'no new file found!'
            else:
                checker_instance.go_fake()
                checker_instance.start_loading()
                tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
                message = 'job started!'

        if self.request.connection.stream.closed():
            return
        self.write(dict(
            success=not error, message=message
        ))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/check", CheckHandler),
            (r"/reload", InstanceReloadHandler),
            (r"/health", HealthHandler),
            (r"/log-event", SubmitLogHandler),
        ],
        debug=options.debug,
    )
    checker_instance = CheckerInstance()

Я хочу, чтобы эта служба продолжала отвечать на запросы после запуска checker_instance.renew в другом потоке. Но это не то, что происходит. Когда я нажимаю на конечную точку /reload/ и функция renew начинает работать, любой запрос к /check/ останавливается и ждет завершения процесса перезагрузки, а затем снова начинает работать. Когда DataStructure загружается, служба должна находиться в режиме fake и отвечать людям с тем же запросом, который они отправляют в качестве ввода.

Я тестировал этот код в моей среде разработки с процессором i5 (4 ядра процессора) и все работает просто отлично! Но в производственной среде (3 двухпоточных ядра ЦП) конечная точка /check/ останавливает запросы.

1 Ответ

2 голосов
/ 29 октября 2019

Трудно полностью отследить события, которые обрабатываются, потому что вы вырезали часть кода для краткости. Например, я не вижу здесь реализации get_response, поэтому я не знаю, ожидает ли она чего-то самого, что может зависеть от состояния checker_instance.

Одна область, которую я хотел бы изучить, находится впотокобезопасность (или кажущееся отсутствие) при передаче от checker_instance.renew до run_in_executor. Это вызывает сомнения у меня, потому что вы изменяете состояние одного экземпляра CheckerInstance из отдельного потока. Несмотря на то, что это может явно не нарушать ситуацию, похоже, что это может представлять странные условия гонки или непредвиденные копии памяти, которые могут объяснить непредвиденное поведение, с которым вы сталкиваетесь

Если возможно, я бы сделал все, что бы вы ни использовали при загрузкечто вы хотите, чтобы выгрузка в поток была полностью автономной, а когда данные загружаются, возвращайте их как результат функции, который затем может быть возвращен в ваш checker_instance. Если бы вы делали это с кодом «как есть», вы бы хотели дождаться вызова run_in_executor его результата и затем обновить checker_instance. Это будет означать, что запрос перезагрузки GET будет ждать, пока данные не будут загружены. В качестве альтернативы, в вашем запросе перезагрузки GET вы можете ioloop.spawn_callback функции, которая запускает run_in_executor таким образом, позволяя завершить запрос перезагрузки вместо ожидания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...