Простой сервер TCP-разветвления с использованием asyncio - PullRequest
5 голосов
/ 04 мая 2019

Что я пытаюсь сделать

Я пытаюсь эмулировать поведение следующей простой команды socat(1):

socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'

Приведенная выше команда создает разветвленный TCP-сервер, который разветвляется и выполняет SOME_PROGRAM для каждого соединения, перенаправляя stdin и stdout указанной команды в сокет TCP.

Вот чего я хотел бы достичь :

  1. Создайте простой TCP-сервер с asyncio для обработки нескольких одновременных подключений.
  2. При получении соединения запускайте SOME_PROGRAM как подпроцесс.
  3. Передача любых данных, полученных из сокета, на стандартный вход SOME_PROGRAM.
  4. Передача любых данных, полученных со стандартного вывода SOME_PROGRAM, в сокет.
  5. Когда выйдет SOME_PROGRAM, напишите прощальное сообщение вместе с кодом выхода в сокет и закройте соединение.

Я хотел бы сделать это на чистом Python, без использования внешних библиотек с использованием модуля asyncio.

Что у меня до сих пор

Вот код, который я написал до сих пор (не пугайтесь, если он длинный, просто сильно прокомментирован и размечен):

import asyncio

class ServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.client_addr   = transport.get_extra_info('peername')
        self.transport     = transport
        self.child_process = None

        print('Connection with {} enstablished'.format(self.client_addr))

        asyncio.ensure_future(self._create_subprocess())

    def connection_lost(self, exception):
        print('Connection with {} closed.'.format(self.client_addr))

        if self.child_process.returncode is not None:
            self.child_process.terminate()

    def data_received(self, data):
        print('Data received: {!r}'.format(data))

        # Make sure the process has been spawned
        # Does this even make sense? Looks so awkward to me...
        while self.child_process is None:
            continue

        # Write any received data to child_process' stdin
        self.child_process.stdin.write(data)

    async def _create_subprocess(self):
        self.child_process = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE
        )

        # Start reading child stdout
        asyncio.ensure_future(self._pipe_child_stdout())

        # Ideally I would register some callback here so that when
        # child_process exits I can write to the socket a goodbye
        # message and close the connection, but I don't know how
        # I could do that...

    async def _pipe_child_stdout(self):
        # This does not seem to work, this function returns b'', that is an
        # empty buffer, AFTER the process exits...
        data = await self.child_process.stdout.read(100) # Arbitrary buffer size

        print('Child process data: {!r}'.format(data))

        if data:
            # Send to socket
            self.transport.write(data)
            # Reschedule to read more data
            asyncio.ensure_future(self._pipe_child_stdout())


SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
    server = loop.run_until_complete(coro)

    print('Serving on {}'.format(server.sockets[0].getsockname()))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

А также программа ./test, которую я пытаюсь запустить как подпроцесс:

#!/usr/bin/env python3

import sys

if sys.stdin.read(2) == 'a\n':
    sys.stdout.write('Good!\n')
else:
    sys.exit(1)

if sys.stdin.read(2) == 'b\n':
    sys.stdout.write('Wonderful!\n')
else:
    sys.exit(1)

sys.exit(0)

К сожалению, приведенный выше код на самом деле не работает, и я немного растерялся из-за того, что попробовать дальше.

Что работает как задумано :

  • Дочерний процесс корректно создается, и, кажется, также правильно получает входные данные из сокета, потому что я вижу его из htop, и я также вижу, как только я отправляю b\n это заканчивается.

Что не работает, как задумано :

В основном все остальное ...

  • Вывод дочернего процесса никогда не отправляется в сокет и фактически никогда не читается вообще. Вызов await self.child_process.stdout.read(100), кажется, никогда не завершается: вместо этого он завершается только после дочерний процесс умирает, и в результате получается просто b'' (пустой bytes объект).
  • Я не могу понять, когда завершается дочерний процесс: как я упоминал выше, я хотел бы отправить сообщение "Goodbye" в сокет вместе с self.child_process.returncode, когда это произойдет, но я не знаю, как сделать это так, чтобы это имело смысл.

Что я пробовал :

  • Я попытался создать дочерний процесс, используя asyncio.loop.subprocess_exec() вместо asyncio.create_subprocess_exec(). Это решает проблему знания того, когда процесс завершается, поскольку я могу создать экземпляр подкласса asyncio.SubprocessProtocol и использовать его метод process_exited(), , но не делает это мне совсем не поможет, поскольку, если я сделаю это таким образом, у меня не будет возможности поговорить с процессом "stdin или stdout!" То есть у меня нет Process объекта для взаимодействия с ...
  • Я попытался поиграть с asyncio.loop.connect_write_pipe() и loop.connect_read_pipe() без удачи.

Вопросы

Итак, кто-нибудь может помочь мне понять, что я делаю не так? Должен быть способ сделать это гладко. Когда я только начинал, я искал способ просто без особых усилий использовать перенаправление трубы, но я не знаю, возможно ли это вообще на этом этапе. Это? Похоже, так и должно быть.

Я мог бы написать эту программу на C за 15 минут, используя fork(), exec() и dup2(), поэтому я кое-что должен упустить! Любая помощь приветствуется.

1 Ответ

5 голосов
/ 04 мая 2019

Ваш код имеет две непосредственные проблемы с реализацией:

  • Сервер удаляет пробелы из полученных данных перед их передачей в подпроцесс.Это удаляет завершающий символ новой строки, поэтому, если TCP-клиент отправляет "a\n", подпроцесс получит только "a".Таким образом, подпроцесс никогда не встречает ожидаемую строку "a\n" и всегда завершается после чтения двух байтов.Это объясняет пустую строку (EOF), поступающую из подпроцесса. (Разбор был удален при последующем редактировании вопроса.)
  • Подпроцесс не сбрасывает свой вывод, поэтому сервер не 'не получает ни одной из своих записей.Записи видны только при выходе из подпроцесса или при заполнении выходного буфера, который измеряется в килобайтах и ​​занимает некоторое время для заполнения при отображении коротких сообщений отладки.

Другая проблема связана суровень дизайна.Как упоминалось в комментариях, если ваше явное намерение не заключается в реализации нового асинхронного протокола, рекомендуется придерживаться высокоуровневого потокового API , в данном случае start_server функция.Функциональность даже более низкого уровня, например SubprocessProtocol, connect_write_pipe и connect_read_pipe, также не является тем, что вы хотели бы использовать в коде приложения.Остальная часть этого ответа предполагает реализацию на основе потока.

start_server принимает сопрограмму, которая будет порождаться как новая задача при каждом подключении клиента.Он вызывается с двумя аргументами asyncio stream, один для чтения и один для записи.Сопрограмма содержит логику общения с клиентом;в вашем случае это вызовет подпроцесс и передаст данные между ним и клиентом.

Обратите внимание, что двунаправленная передача данных между сокетом и подпроцессом не может быть достигнута с помощью простого цикла с чтениями, за которыми следуют записи.Например, рассмотрим этот цикл:

# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
    proc_data = await child.stdout.read(1024)  # (1)
    sock_writer.write(proc_data)
    sock_data = await sock_reader.read(1024)
    child.stdin.write(sock_data)               # (2)

Этот тип цикла подвержен тупикам.Если подпроцесс отвечает на данные, которые он получает от TCP-клиента, он иногда будет предоставлять выходные данные только после того, как получит какой-либо ввод.Это заблокирует цикл в (1) на неопределенный срок, потому что он может получить данные из дочернего элемента stdout только после отправки дочернему элементу sock_data, что происходит позже, в (2).По сути, (1) ожидает (2) и наоборот, образуя тупик.Обратите внимание, что изменение порядка передачи не поможет, потому что тогда цикл будет тупиковым, если TCP-клиент обрабатывает выходные данные подпроцесса сервера.

Поскольку в нашем распоряжении asyncio, такого тупика легко избежать: простопорождает две сопрограммы параллельно , одна из которых передает данные из сокета в стандартный поток подпроцесса, а другая передает данные из стандартного stdout в сокет.Например:

# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
    while True:
        data = await src.read(1024)
        if data == b'':
            break
        dest.write(data)

child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()

Разница между этой настройкой и первым циклом while заключается в том, что эти две передачи независимы друг от друга.Не может возникнуть взаимоблокировка, поскольку чтение из сокета никогда не ожидает чтения из подпроцесса и наоборот.

Применительно к вопросу весь сервер будет выглядеть так:

import asyncio

class ProcServer:
    async def _transfer(self, src, dest):
        while True:
            data = await src.read(1024)
            if data == b'':
                break
            dest.write(data)

    async def _handle_client(self, r, w):
        loop = asyncio.get_event_loop()
        print(f'Connection from {w.get_extra_info("peername")}')
        child = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)
        sock_to_child = loop.create_task(self._transfer(r, child.stdin))
        child_to_sock = loop.create_task(self._transfer(child.stdout, w))
        await child.wait()
        sock_to_child.cancel()
        child_to_sock.cancel()
        w.write(b'Process exited with status %d\n' % child.returncode)
        w.close()

    async def start_serving(self):
        await asyncio.start_server(self._handle_client,
                                   '0.0.0.0', SERVER_PORT)

SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    server = ProcServer()
    loop.run_until_complete(server.start_serving())
    loop.run_forever()

Сопровождающая программа test также должна быть модифицирована так, чтобы вызывать sys.stdout.flush() после каждого sys.stdout.write(), в противном случае сообщения задерживаются в своих буферах stdio вместо того, чтобы отправляться родителю.

Когда я только начинал, я искал способ просто без особых усилий использовать перенаправление трубы, но я не знаю, возможно ли это вообще на этом этапе.Это?Похоже, что так и должно быть.

В Unix-подобных системах, безусловно, возможно перенаправить сокет в порожденный подпроцесс, чтобы подпроцесс напрямую общался с клиентом.(Старый inetd сервер Unix работает следующим образом.) Но этот режим работы не поддерживается asyncio по двум причинам:

  • он работает не на всех системахподдерживается Python и asyncio, в частности в Windows.
  • он несовместим с основными функциями asyncio, такими как транспорты / протоколы и потоки, которые предполагают владение и эксклюзивный доступ к базовым сокетам.

Даже если вы не заботитесь о переносимости, подумайте над вторым моментом: вам может потребоваться обработать или записать данные, которыми обмениваются TCP-клиент и подпроцесс, и вы не сможете этого сделать, если они соединены вместе. уровень ядра. Кроме того, тайм-ауты и отмену намного проще реализовать в сопрограммах asyncio, чем при работе только с непрозрачным подпроцессом.

Если непереносимость и неспособность управлять связью подходят для вашего случая использования, то, вероятно, вам, во-первых, не требуется asyncio - ничто не мешает вам создавать поток, который запускает классический сервер блокировки, который обрабатывает каждого клиента с той же последовательностью os.fork, os.dup2 и os.execlp, которую вы написали бы на C.

EDIT

Как указывает OP в комментарии, исходный код обрабатывает отключение TCP-клиента, убивая дочерний процесс. На уровне потока потеря соединения отражается потоком либо сообщая об окончании файла, либо вызывая исключение. В приведенном выше коде можно легко отреагировать на потерю соединения, заменив универсальный self._transfer() более специфической сопрограммой, которая обрабатывает этот случай. Например, вместо:

sock_to_child = loop.create_task(self._transfer(r, child.stdin))

... можно написать:

sock_to_child = loop.create_task(self._sock_to_child(r, child))

и определите _sock_to_child следующим образом (не проверено):

async def _sock_to_child(self, reader, child):
    try:
        await self._transfer(reader, child.stdin)
    except IOError as e:
        # IO errors are an expected part of the workflow,
        # we don't want to propagate them
        print('exception:', e)
    child.kill()

Если дочерний процесс переживает TCP-клиент, строка child.kill(), скорее всего, никогда не будет выполнена, поскольку сопрограмма будет отменена на _handle_client, пока она приостановлена ​​в src.read() внутри _transfer().

...