Ваш код имеет две непосредственные проблемы с реализацией:
- Сервер удаляет пробелы из полученных данных перед их передачей в подпроцесс.Это удаляет завершающий символ новой строки, поэтому, если TCP-клиент отправляет
"a\n"
, подпроцесс получит только "a"
.Таким образом, подпроцесс никогда не встречает ожидаемую строку "a\n"
и всегда завершается после чтения двух байтов.Это объясняет пустую строку (EOF), поступающую из подпроцесса. (Разбор был удален при последующем редактировании вопроса.) - Подпроцесс не сбрасывает свой вывод, поэтому сервер не 'не получает ни одной из своих записей.Записи видны только при выходе из подпроцесса или при заполнении выходного буфера, который измеряется в килобайтах и занимает некоторое время для заполнения при отображении коротких сообщений отладки.
Другая проблема связана суровень дизайна.Как упоминалось в комментариях, если ваше явное намерение не заключается в реализации нового асинхронного протокола, рекомендуется придерживаться высокоуровневого потокового API , в данном случае start_server
функция.Функциональность даже более низкого уровня, например SubprocessProtocol
, connect_write_pipe
и connect_read_pipe
, также не является тем, что вы хотели бы использовать в коде приложения.Остальная часть этого ответа предполагает реализацию на основе потока.
start_server
принимает сопрограмму, которая будет порождаться как новая задача при каждом подключении клиента.Он вызывается с двумя аргументами asyncio stream, один для чтения и один для записи.Сопрограмма содержит логику общения с клиентом;в вашем случае это вызовет подпроцесс и передаст данные между ним и клиентом.
Обратите внимание, что двунаправленная передача данных между сокетом и подпроцессом не может быть достигнута с помощью простого цикла с чтениями, за которыми следуют записи.Например, рассмотрим этот цикл:
# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
proc_data = await child.stdout.read(1024) # (1)
sock_writer.write(proc_data)
sock_data = await sock_reader.read(1024)
child.stdin.write(sock_data) # (2)
Этот тип цикла подвержен тупикам.Если подпроцесс отвечает на данные, которые он получает от TCP-клиента, он иногда будет предоставлять выходные данные только после того, как получит какой-либо ввод.Это заблокирует цикл в (1) на неопределенный срок, потому что он может получить данные из дочернего элемента stdout
только после отправки дочернему элементу sock_data
, что происходит позже, в (2).По сути, (1) ожидает (2) и наоборот, образуя тупик.Обратите внимание, что изменение порядка передачи не поможет, потому что тогда цикл будет тупиковым, если TCP-клиент обрабатывает выходные данные подпроцесса сервера.
Поскольку в нашем распоряжении asyncio, такого тупика легко избежать: простопорождает две сопрограммы параллельно , одна из которых передает данные из сокета в стандартный поток подпроцесса, а другая передает данные из стандартного stdout в сокет.Например:
# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
while True:
data = await src.read(1024)
if data == b'':
break
dest.write(data)
child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()
Разница между этой настройкой и первым циклом while
заключается в том, что эти две передачи независимы друг от друга.Не может возникнуть взаимоблокировка, поскольку чтение из сокета никогда не ожидает чтения из подпроцесса и наоборот.
Применительно к вопросу весь сервер будет выглядеть так:
import asyncio
class ProcServer:
async def _transfer(self, src, dest):
while True:
data = await src.read(1024)
if data == b'':
break
dest.write(data)
async def _handle_client(self, r, w):
loop = asyncio.get_event_loop()
print(f'Connection from {w.get_extra_info("peername")}')
child = await asyncio.create_subprocess_exec(
*TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE)
sock_to_child = loop.create_task(self._transfer(r, child.stdin))
child_to_sock = loop.create_task(self._transfer(child.stdout, w))
await child.wait()
sock_to_child.cancel()
child_to_sock.cancel()
w.write(b'Process exited with status %d\n' % child.returncode)
w.close()
async def start_serving(self):
await asyncio.start_server(self._handle_client,
'0.0.0.0', SERVER_PORT)
SERVER_PORT = 6666
TARGET_PROGRAM = ['./test']
if __name__ == '__main__':
loop = asyncio.get_event_loop()
server = ProcServer()
loop.run_until_complete(server.start_serving())
loop.run_forever()
Сопровождающая программа test
также должна быть модифицирована так, чтобы вызывать sys.stdout.flush()
после каждого sys.stdout.write()
, в противном случае сообщения задерживаются в своих буферах stdio вместо того, чтобы отправляться родителю.
Когда я только начинал, я искал способ просто без особых усилий использовать перенаправление трубы, но я не знаю, возможно ли это вообще на этом этапе.Это?Похоже, что так и должно быть.
В Unix-подобных системах, безусловно, возможно перенаправить сокет в порожденный подпроцесс, чтобы подпроцесс напрямую общался с клиентом.(Старый inetd
сервер Unix работает следующим образом.) Но этот режим работы не поддерживается asyncio по двум причинам:
- он работает не на всех системахподдерживается Python и asyncio, в частности в Windows.
- он несовместим с основными функциями asyncio, такими как транспорты / протоколы и потоки, которые предполагают владение и эксклюзивный доступ к базовым сокетам.
Даже если вы не заботитесь о переносимости, подумайте над вторым моментом: вам может потребоваться обработать или записать данные, которыми обмениваются TCP-клиент и подпроцесс, и вы не сможете этого сделать, если они соединены вместе. уровень ядра. Кроме того, тайм-ауты и отмену намного проще реализовать в сопрограммах asyncio, чем при работе только с непрозрачным подпроцессом.
Если непереносимость и неспособность управлять связью подходят для вашего случая использования, то, вероятно, вам, во-первых, не требуется asyncio - ничто не мешает вам создавать поток, который запускает классический сервер блокировки, который обрабатывает каждого клиента с той же последовательностью os.fork
, os.dup2
и os.execlp
, которую вы написали бы на C.
EDIT
Как указывает OP в комментарии, исходный код обрабатывает отключение TCP-клиента, убивая дочерний процесс. На уровне потока потеря соединения отражается потоком либо сообщая об окончании файла, либо вызывая исключение. В приведенном выше коде можно легко отреагировать на потерю соединения, заменив универсальный self._transfer()
более специфической сопрограммой, которая обрабатывает этот случай. Например, вместо:
sock_to_child = loop.create_task(self._transfer(r, child.stdin))
... можно написать:
sock_to_child = loop.create_task(self._sock_to_child(r, child))
и определите _sock_to_child
следующим образом (не проверено):
async def _sock_to_child(self, reader, child):
try:
await self._transfer(reader, child.stdin)
except IOError as e:
# IO errors are an expected part of the workflow,
# we don't want to propagate them
print('exception:', e)
child.kill()
Если дочерний процесс переживает TCP-клиент, строка child.kill()
, скорее всего, никогда не будет выполнена, поскольку сопрограмма будет отменена на _handle_client
, пока она приостановлена в src.read()
внутри _transfer()
.