Python Совместное использование сетевого сокета с многопроцессорной обработкой. Менеджер - PullRequest
2 голосов
/ 17 декабря 2010

В настоящее время я пишу модуль прокси-сервера nginx с очередью запросов впереди, поэтому запросы не сбрасываются, когда серверы за nginx не могут обрабатывать запросы (nginx настроен как балансировщик нагрузки).

Я использую

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

Идея состоит в том, чтобы поместить запрос в очередь перед обработкой.Я знаю, что multiprocessing.Queue поддерживает только простой объект и не поддерживает необработанные сокеты, поэтому я попытался использовать multiprocess.Manager для создания общего словаря.Диспетчер также использует сокеты для соединения, поэтому этот метод тоже не удался.Есть ли способ разделить сетевые сокеты между процессами?Вот проблемная часть кода:

class ProxyServer(Threader, HTTPServer):

    def __init__(self, server_address, bind_and_activate=True):
        HTTPServer.__init__(self, server_address, ProxyHandler,
                bind_and_activate)

        self.manager = multiprocessing.Manager()

        self.conn_dict = self.manager.dict()
        self.ticket_queue = multiprocessing.Queue(maxsize= 10)
        self._processes = []
        self.add_worker(5)


    def process_request(self, request, client):
        stamp = time.time()
        print "We are processing"

        self.conn_dict[stamp] = (request, client) # the program crashes here


    #Exception happened during processing of request from ('172.28.192.34', 49294)
    #Traceback (most recent call last):
    #  File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
    #    self.process_request(request, client_address)
    #  File "./nxproxy.py", line 157, in process_request
    #    self.conn_dict[stamp] = (request, client)
    #  File "<string>", line 2, in __setitem__
    #  File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    #    conn.send((self._id, methodname, args, kwds))
    #TypeError: expected string or Unicode object, NoneType found

        self.ticket_queue.put(stamp)


    def add_worker(self, number_of_workers):
        for worker in range(number_of_workers):
            print "Starting worker %d" % worker
            proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
            self._processes.append(proc)
            proc.start()

    def _worker(self, conn_dict):
        while 1:
            ticket = self.ticket_queue.get()

            print conn_dict
            a=0
            while a==0:
                try:
                    request, client = conn_dict[ticket]
                    a=1
                except Exception:
                    pass
            print "We are threading!"
            self.threader(request, client)

Ответы [ 3 ]

7 голосов
/ 31 декабря 2011

U может использовать multiprocessing.reduction для передачи объектов соединения и сокета между процессами

Пример кода

# Main process
from multiprocessing.reduction import reduce_handle
h = reduce_handle(client_socket.fileno())
pipe_to_worker.send(h)

# Worker process
from multiprocessing.reduction import rebuild_handle
h = pipe.recv()
fd = rebuild_handle(h)
client_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
client_socket.send("hello from the worker process\r\n") 
0 голосов
/ 05 февраля 2013

Вы можете посмотреть на этот код - https://gist.github.com/sunilmallya/4662837, который сервер сокетов multiprocessing.reduction с родительской обработкой, передающий соединения клиенту после приема соединений

0 голосов
/ 17 декабря 2010

Похоже, вам нужно передавать файловые дескрипторы между процессами (предполагая, что здесь Unix, понятия не имею о Windows). Я никогда не делал этого в Python, но здесь есть ссылка на проект python-passfd , который вы, возможно, захотите проверить.

...