Мы все знаем, что использование asyncio существенно повышает производительность сокет-сервера, и, очевидно, все становится еще лучше, если мы сможем использовать все ядра нашего процессора (возможно, через многопроцессорный модуль или os.fork()
и т. Д.)
Сейчас я пытаюсь создать демонстрационную версию многоядерного сервера сокетов с асинхронным сервером сокетов, прослушивающим каждое ядро и привязывающим все к одному порту. Просто создав асинхронный сервер и затем используйте os.fork()
, чтобы процессы работали на конкурентной основе.
Однако, когда я пытаюсь раскошелиться, у кода с одним ядром все в порядке. Похоже, что есть проблема с регистрацией одинаковых файловых дескрипторов из разных процессов в модуле выбора epoll.
Я показываю код ниже, кто-нибудь может мне помочь?
Вот простой, логически понятный код эхо-сервера с использованием asyncio:
import os
import asyncio #,uvloop
from socket import *
# hendler sends back incoming message directly
async def handler(loop, client):
with client:
while True:
data = await loop.sock_recv(client, 64)
if not data:
break
await loop.sock_sendall(client, data)
# create tcp server
async def create_server(loop):
sock = socket(AF_INET ,SOCK_STREAM)
sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
sock.bind(('',25000))
sock.listen()
sock.setblocking(False)
return sock
# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
while True:
client ,addr = await loop.sock_accept(sock)
loop.create_task(handler(loop ,client))
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()
Работает нормально, пока я не пытаюсь форкнуть, после того, как сокет был восстановлен и до того, как сервер начнет обслуживать (Эта логика отлично работает в коде с синхронной многопоточностью.)
Когда я пытаюсь это:
loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
pid = os.fork()
if pid <= 0: # fork process as the same number as
break # my cpu cores
loop.create_task(serving(loop, sock))
loop.run_forever()
Теоретически разветвленный процесс привязан к одному и тому же сокету? И запустить в том же цикле событий? тогда работать нормально?
Однако я получаю следующие сообщения об ошибках:
Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/test/temp1.py", line 23, in serving
client ,addr = await loop.sock_accept(sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
self._sock_accept(fut, False, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
self.add_reader(fd, self._sock_accept, fut, True, sock)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
return self._add_reader(fd, callback, *args)
File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
(handle, None))
File "/usr/local/lib/python3.7/selectors.py", line 359, in register
self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists
Python версия 3.7.3,
Я совершенно не понимаю, что происходит.
Может ли кто-нибудь помочь? спасибо