Не удалось использовать os.fork () привязать несколько процессов к одному серверу сокетов при использовании asyncio - PullRequest
2 голосов
/ 01 июня 2019

Мы все знаем, что использование asyncio существенно повышает производительность сокет-сервера, и, очевидно, все становится еще лучше, если мы сможем использовать все ядра нашего процессора (возможно, через многопроцессорный модуль или os.fork() и т. Д.)

Сейчас я пытаюсь создать демонстрационную версию многоядерного сервера сокетов с асинхронным сервером сокетов, прослушивающим каждое ядро ​​и привязывающим все к одному порту. Просто создав асинхронный сервер и затем используйте os.fork(), чтобы процессы работали на конкурентной основе.

Однако, когда я пытаюсь раскошелиться, у кода с одним ядром все в порядке. Похоже, что есть проблема с регистрацией одинаковых файловых дескрипторов из разных процессов в модуле выбора epoll.

Я показываю код ниже, кто-нибудь может мне помочь?


Вот простой, логически понятный код эхо-сервера с использованием asyncio:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

Работает нормально, пока я не пытаюсь форкнуть, после того, как сокет был восстановлен и до того, как сервер начнет обслуживать (Эта логика отлично работает в коде с синхронной многопоточностью.)


Когда я пытаюсь это:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

Теоретически разветвленный процесс привязан к одному и тому же сокету? И запустить в том же цикле событий? тогда работать нормально?

Однако я получаю следующие сообщения об ошибках:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python версия 3.7.3,

Я совершенно не понимаю, что происходит.

Может ли кто-нибудь помочь? спасибо

1 Ответ

1 голос
/ 01 июня 2019

Согласно проблеме с трекером , не поддерживается разветвление существующего цикла событий asyncio и попытка использовать его из нескольких процессов. Однако, согласно комментарию Юрия по той же проблеме, многопроцессорная обработка может быть реализована путем разветвления перед началом цикла, поэтому в каждом дочернем элементе выполняется полностью независимые асинхронные циклы.

Ваш код фактически подтверждает эту возможность: хотя create_server равно async def, он ничего не ждет и не использует аргумент loop. Таким образом, мы можем реализовать подход Юрия, сделав create_server обычной функцией, удалив аргумент loop и вызвав его до os.fork(), и запустив только циклы событий после разветвления:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
...