Ну, нет ответов, так что я думаю, что нет заранее подготовленного способа сделать это.Для потомков я накатил свое собственное решение, похожее на эту придуманную версию.
Итак, поехали.Сценарий, который 1) запускает сервер отдыха, 2) порождает внешних клиентов, которые делают вещи и отчитываются перед (1), 3) выключает сервер, когда все данные получены и продолжаются.
По сути, так как этоСценарий будет запускаться из командной строки, он должен использовать временный порт (в противном случае несколько человек, выполняющих его одновременно, портируют конфликт). Сервер - это просто http.server в потоке, который имеет несколько обратных вызовов для захвата событий, ииспользует потокобезопасную очередь. Очередь для передачи событий, происходящих на сервере, в основной поток.Это позволяет нам узнать, когда нужно выключить сервер.
import json
import threading
import http.server
import logging
from queue import Queue
q = Queue()
EPHEMERAL_PORT = 0
num_events = 0
MAX_EVENTS = 3
class TinyRESTHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, service_map, *args):
self.service_map = service_map
http.server.BaseHTTPRequestHandler.__init__(self, *args)
def respond(self, code, message):
self.send_response(code)
self.send_header("Content-Type", "text/ascii")
self.send_header("Content-Length", str(len(message.encode())))
self.end_headers()
self.wfile.write(message.encode())
def handle_POST(self, json_handler):
"""
Route POST requests to the appropriate handler.
"""
payload = self.rfile.read(int(self.headers['Content-Length']))
try:
json_payload = json.loads(payload)
json_handler(self.path, json_payload)
except json.decoder.JSONDecodeError as e:
self.respond(400, "Bad Request: Invalid JSON")
self.respond(200, "OK")
def do_POST(self):
if (self.path in self.service_map):
self.handle_POST(self.service_map[self.path])
else:
self.respond(404, "Not Found")
class EphemeralHTTPServer(http.server.HTTPServer):
"""
We cannot know the port used by an Ephemeral HTTP server until
it has tried to bind a port (at which point the OS gives it a
free port.) This adds a callback to the bind function that allows
us to be notified as soon as a port has been obtained.
"""
def __init__(self, hostname, port_notify_cb, *args, **kwargs):
self.port_notify_cb = port_notify_cb
super().__init__((hostname, EPHEMERAL_PORT), *args, **kwargs)
def server_bind(self):
"""
The server will notify port_notify_cb of its address
once it has bound a port.
"""
super().server_bind()
if (self.port_notify_cb):
self.port_notify_cb(self.server_address)
class TinyRESTServer():
def __init__(self, host, port):
self.host = host
self.port = port
self.service_map = dict()
def register_service(self, path, callback):
self.service_map[path] = callback
def get_handler(self, *args):
"""
HTTPServer creates a new handler for every request. This ensures
that the TinyRESTHandlers are supplied with the service map.
"""
return TinyRESTHandler(self.service_map, *args)
def getHTTPServer(self):
return http.server.HTTPServer((self.host, self.port), self.get_handler)
def run(self):
"""
The server_close call forces HTTPServer to relinquish its port.
"""
self.server = self.getHTTPServer()
try:
self.server.serve_forever()
finally:
self.server.server_close()
def shutdown(self):
self.server.shutdown()
class EphemeralRESTServer(TinyRESTServer):
def __init__(self, host, address_cb):
self.address_cb = address_cb
super().__init__(host, 0)
def getHTTPServer(self):
return EphemeralHTTPServer(self.host, self.address_cb, self.get_handler)
class ServerEvent:
def __init__(self, name):
self.name = name
class PortAcquiredEvent(ServerEvent):
def __init__(self, hostname, port):
super().__init__("port acquired")
self.hostname = hostname
self.port = port
def __str__(self):
return f"{self.name}: (host, port) = ({self.hostname}, {self.port})"
class JSONEvent(ServerEvent):
def __init__(self, json_content):
super().__init__("JSON results")
self.json_content = json_content
def __str__(self):
return f"{self.name}: {self.json_content}"
def get_server_address(server_address):
"""
When the server binds an ephemeral port, it will call this
function to tell us what port the OS provided. Using a queue
ensures that the main prog doesn't try to get the port before
the HTTP server in the thread has successfully obtained one.
"""
q.put(PortAcquiredEvent(server_address[0], server_address[1]))
def add_to_queue(req_type, json_content):
"""
Contrived REST service handler.
"""
q.put(JSONEvent(json_content))
def check_if_we_should_stop_the_server(event):
"""
Contrived function to test when we should stop the http server
and do something with the received data.
"""
global num_events
global MAX_EVENTS
print(event)
num_events += 1
return num_events < MAX_EVENTS
# Start an HTTP server, in a thread, on port 0.
server = EphemeralRESTServer("localhost", get_server_address)
server.register_service('/post_server_info', add_to_queue)
server_thread = threading.Thread(None, server.run)
server_thread.start()
"""
Do something here to cause rest clients to start hitting this
server (for example invoking the clients via subprocess or
whatevs.)
"""
# Block until the queue obtains a value.
cur_val = q.get()
while check_if_we_should_stop_the_server(cur_val):
cur_val = q.get()
# Stop the HTTP server.
server.shutdown()
server_thread.join()
# Do normal script stuff with the data...
Все началось "крошечно", хорошо?Отсюда и название.