Внедрить максимальное количество одновременных вызовов для каждой функции на сервер Python XML -RP C - PullRequest
0 голосов
/ 03 марта 2020

У меня есть Python на основе SimpleXMLRPCServer, подобный этому:

from multiprocessing import Process
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer

class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
    pass


# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ('/RPC2',)


def main():
    server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
    server.register_function(tester1)
    server.register_function(tester2)

    print("Server running...")
    server.serve_forever()


def tester1(id):
    p = Process(target=my_func1, args=(id,))
    p.start()

    return True

def tester2(id):
    p = Process(target=my_func2, args=(id,))
    p.start()

    return True

Я хочу реализовать метод, чтобы отслеживать, сколько параллельных процессов в настоящее время выполняется для tester1 и tester2, и если их максимальное количество (определяемое пользователем) все еще выполняется, то ставьте в очередь каждый новый запрос и выполняйте, когда число падает ниже порогового значения.

Может быть, общее Pool для каждая функция?

1 Ответ

0 голосов
/ 13 марта 2020

Следующее, кажется, делает то, что я просил:

from multiprocessing import Process, JoinableQueue
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import SocketServer
import threading

tester1_queue = JoinableQueue()
tester2_queue = JoinableQueue()
tester1_max_concurrent = 10
tester2_max_concurrent = 10

class RPCThreading(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
    pass


# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ('/RPC2',)


def main():
    # spin up tester1 queue watcher threads
    for i in range(tester1_max_concurrent):
        worker = threading.Thread(target=tester1_queue_watcher, args=(tester1_queue,))
        worker.daemon = True
        worker.start()

    # spin up tester2 queue watcher threads
    for i in range(tester2_max_concurrent):
        worker = threading.Thread(target=tester2_queue_watcher, args=(tester2_queue,))
        worker.daemon = True
        worker.start()

    server = RPCThreading(('127.0.0.1', 8000), requestHandler=RequestHandler)
    server.register_function(tester1_handler, 'tester1')
    server.register_function(tester2_handler, 'tester2' )

    print("Server running...")
    server.serve_forever()

def tester1_handler(id):
    tester1_queue.put((id,))
    return True

def tester1_queue_watcher(q):
    while True:
        id = q.get()
        p = Process(target=tester1, args=(id,))
        p.start()
        p.join()
        q.task_done()

def tester1(id):
    # do stuff

def tester2_handler(id):
    tester2_queue.put((id,))
    return True

def tester2_queue_watcher(q):
    while True:
        id = q.get()
        p = Process(target=tester2, args=(id,))
        p.start()
        p.join()
        q.task_done()

def tester2(id):
    # do stuff
...