HTTP-сервер поддерживает поток в схеме вложенных потоков - PullRequest
0 голосов
/ 26 августа 2018

У меня есть небольшая реализация HTTPServer, которую я раскручиваю для прослушивания обратного вызова из API.В тестировании это подразумевает поддержание самого внутреннего потока.Вот сервер:

import http
import uuid
from http import server

class Server(server.HTTPServer):

    RequestLog:list = []
    ErrorList:list = []
    Serve:bool = True

    def __init__(self, server_address, RequestHandlerClass):

        self.RequestLog = []
        self.ErrorList = []
        self.Serve:bool = True

        return super().__init__(server_address, RequestHandlerClass)

    def LogRequest(self, clientAddress, success, state, params:dict={}):
        """docstring"""

        uid = uuid.uuid1()
        logItem = {"RequestID" : uid,
                   "ClientAddress" : clientAddress,
                   "Success" : success,
                   "State" : state,
                   "Params" : params}

        self.RequestLog.append(logItem)

    def GetRequestItem(self, state):
        """docstring"""

        logItem = {}
        if self.RequestLog and len(self.RequestLog):
            logItem = [d for d in self.RequestLog if d["State"] == state][0]

        return logItem

    def service_actions(self):

        try:
            if not self.Serve:
                self.shutdown()
                self.server_close()
        except Exception as e:
            err = e
            raise e

        return super().service_actions()

    def handle_error(self, request, client_address):


        logItem = {"clientAddress" : client_address,
                   "success" : False,
                   "state" : None,
                   "params" : None}

        try:
            self.LogRequest(**logItem)
            x = request
        except Exception as e:
            self.shutdown()
            err = e
            raise e

        return super().handle_error(request, client_address)

Итак, что делает вышеописанная реализация сервера, это записывает информацию о запросах в ResquestLog:list, а затем предоставляет метод GetRequestItem, который можно использовать для определения наличиязарегистрированный запрос.В тесте я выкидываю ошибку и ловлю ее с помощью переопределения handle_error().Вот вызывающая функция, которая раскручивает сервер, опрашивает запрос, а затем отключает сервер, устанавливая для его метода Server.Serve значение False

def AwaitCallback(self, server_class=Server,
                     handler_class=OAuthGrantRequestHandler):
        """docstring"""

        server_address = ("127.0.0.1", 8080)
        self.Httpd = server_class(server_address, handler_class)
        self.Httpd.timeout = 200
        t1 = threading.Thread(target=self.Httpd.serve_forever)

        try:
            t1.start()

            #poll for request result
            result = {}
            x = 0
            while x < self.Timeout:
                if len(self.Httpd.RequestLog) > 0:
                    break
                time.sleep(.5)

        finally:
            #Terminate Server
            if self.Httpd:
                self.Httpd.Serve = False
            if t1:
                t1.join()
    return

Вышеуказанный метод привязывается к вызову t1.join(),Проверка объекта self.Httpd при его зависании говорит мне, что серверный цикл serve_forever() отключен, но поток по-прежнему показывает его работоспособность при вызове t1.is_alive().Так, что происходит?Единственное, о чем я могу думать, это то, что когда в потоке t1 вызывается self.shutdown(), это действительно приводит к циклу, а не к его выключению и поддерживает протектор?Документация по отключению просто говорит shutdown() : Tell the serve_forever() loop to stop and wait until it does. Красиво и мрачно.Есть идеи?

Редактировать 1: ответ, предложенный на Как остановить BaseHTTPServer.serve_forever () в подклассе BaseHTTPRequestHandler? совсем другое.Они предлагают переопределить всю встроенную функциональность цикла socketserver.BaseServer.serve_forever () с более простой реализацией, тогда как я пытаюсь правильно использовать встроенную реализацию.Насколько я понимаю, пример моего рабочего кода выше должен достичь того же, что предлагает ответ, но дочерний поток не завершается.Таким образом, этот вопрос.

1 Ответ

0 голосов
/ 26 августа 2018

Мне нужен OP, чтобы проверить это, но проблема здесь в том, что http.server.HTTPServer основан на процессах, а не на потоках, и процесс зомби может быть создан.Фактически, forkingMixIn явно обрабатывает эту проблему :

serve_forever (poll_interval = 0.5)

Обрабатывает запросы до явного запроса shutdown ().Опрос для отключения каждые секунды poll_interval.Игнорирует атрибут тайм-аута.Он также вызывает service_actions (), который может использоваться подклассом или mixin для предоставления действий, специфичных для данной службы. Например, класс ForkingMixIn использует service_actions () для очистки дочерних процессов зомби.

Это включает в себя итерацию всех active_children и вызов функции discard.Вы можете проверить исходный код для socketserver.ForkingMixIn.collect_children (см. Внизу поста).Самым быстрым решением может быть использование ThreadingHTTPServer :

, класс http.server.ThreadingHTTPServer (server_address, RequestHandlerClass)

Этот класс идентичен HTTPServer, но использует потокиобрабатывать запросы с помощью ThreadingMixIn. Это полезно для работы с веб-браузерами, предварительно открывающими сокеты, для которых HTTPServer будет ждать бесконечно.

В случае, если вы хотите использовать обычный сервер

Вам нужно будет создать и обработать active_children и реализовать нечто похожее на метод, который ForkingMixIn вызывает для service_actions:

def collect_children(self):
    """Internal routine to wait for children that have exited."""
    if self.active_children is None:
        return

    # If we're above the max number of children, wait and reap them until
    # we go back below threshold. Note that we use waitpid(-1) below to be
    # able to collect children in size(<defunct children>) syscalls instead
    # of size(<children>): the downside is that this might reap children
    # which we didn't spawn, which is why we only resort to this when we're
    # above max_children.
    while len(self.active_children) >= self.max_children:
        try:
            pid, _ = os.waitpid(-1, 0)
            self.active_children.discard(pid)
        except ChildProcessError:
            # we don't have any children, we're done
            self.active_children.clear()
        except OSError:
            break

    # Now reap all defunct children.
    for pid in self.active_children.copy():
        try:
            pid, _ = os.waitpid(pid, os.WNOHANG)
            # if the child hasn't exited yet, pid will be 0 and ignored by
            # discard() below
            self.active_children.discard(pid)
        except ChildProcessError:
            # someone else reaped it
            self.active_children.discard(pid)
        except OSError:
            pass
...