Я отправился в путь "Сколько производительности я могу выжать из Python веб-сервера?" Это привело меня к AIOHTTP и uvl oop. Тем не менее, я мог видеть, что AIOHTTP не использует мой процессор в полной мере. Я решил использовать многопроцессорность с AIOHTTP. Я узнал, что есть функция ядра Linux, которая позволяет нескольким процессам использовать один и тот же порт TCP. Это привело меня к разработке следующего кода (который прекрасно работает):
import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def handle(request):
name = request.match_info.get('name', "Anonymous")
pid = os.getpid()
text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
time.time(), name, pid)
#time.sleep(5) # intentionally blocking sleep to simulate CPU load
return web.Response(text=text)
def start_server():
host = "127.0.0.1"
port=8000
reuseport = True
app = web.Application()
sock = mk_socket(host, port, reuseport=reuseport)
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
loop = asyncio.get_event_loop()
coro = loop.create_server(
protocol_factory=app.make_handler(),
sock=sock,
)
srv = loop.run_until_complete(coro)
loop.run_forever()
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, CPU_COUNT):
executor.submit(start_server)
тест WRK моего сайта до применения этого кода:
Running 30s test @ http://127.0.0.1:8000/
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 54.33ms 6.54ms 273.24ms 89.95%
Req/Sec 608.68 115.97 2.27k 83.63%
218325 requests in 30.10s, 41.23MB read
Non-2xx or 3xx responses: 218325
Requests/sec: 7254.17
Transfer/sec: 1.37MB
тест WRK после:
Running 30s test @ http://127.0.0.1:8000/
12 threads and 400 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 15.96ms 7.27ms 97.29ms 84.78%
Req/Sec 2.11k 208.30 4.45k 75.50%
759290 requests in 30.08s, 153.51MB read
Requests/sec: 25242.39
Transfer/sec: 5.10MB
WoW! Но есть проблема:
DeprecationWarning: Application.make_handler(...) is deprecated, use AppRunner API instead
protocol_factory=app.make_handler()
Итак, я попробовал это:
import asyncio
import os
import socket
import time
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=8000, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def handle(request):
name = request.match_info.get('name', "Anonymous")
pid = os.getpid()
text = "{:.2f}: Hello {}! Process {} is treating you\n".format(
time.time(), name, pid)
#time.sleep(5) # intentionally blocking sleep to simulate CPU load
return web.Response(text=text)
async def start_server():
host = "127.0.0.1"
port=8000
reuseport = True
app = web.Application()
sock = mk_socket(host, port, reuseport=reuseport)
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
coro = loop.create_server(
protocol_factory=app.make_handler(),
sock=sock,
)
runner = web.AppRunner(app)
await runner.setup()
srv = web.TCPSite(runner, 'localhost', 8000)
await srv.start()
print('Server started at http://127.0.0.1:8000')
return coro, app, runner
async def finalize(srv, app, runner):
sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno())
sock.close()
#await handler.finish_connections(1.0)
await runner.cleanup()
srv.close()
await srv.wait_closed()
await app.finish()
def init():
loop = asyncio.get_event_loop()
srv, app, runner = loop.run_until_complete(init)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, CPU_COUNT):
executor.submit(init)
, который явно неполон, поскольку coro
не используется. Я не уверен, где интегрировать сокет с AppRunner. Ответ должен показывать оригинальный пример, измененный для использования App Runner.