Для любого новичка в AsyncIO, имеющего ту же проблему, я сам нашел решение.
Прежде всего, лучше использовать аспекты высокого уровня AsyncIO - streams
.Вызов loop.create_connction
и loop.create_server
считается низкоуровневым (что я сначала понял неправильно).
Высокоуровневой альтернативой create_connection
является asyncio.open_connection
, которая предоставит вам кортеж, состоящий из asyncio.StreamReader
и asyncio.StreamWriter
, который вы можете использовать для чтения и записи в открытое соединение.Вы также можете обнаружить потерю соединения, когда данные, считанные из StreamReader
, равны b''
или когда вы перехватываете исключение (ConnectionError
) при попытке записи в StreamWriter
.
Высокоуровневой альтернативой create_server
является asyncio.start_server
, которая должна быть снабжена функцией обратного вызова, которая будет вызываться каждый раз, когда устанавливается соединение с сервером (открытое соединение, полученные данные ...).Обратный вызов имеет StreamReader
и StreamWriter
в качестве аргументов.Потеря соединения также может быть обнаружена при получении b''
или ConnectionError
при записи в writer
.
С помощью сопрограмм можно обработать несколько соединений.Может быть сопрограмма для серверной части (которая принимает соединение от одного из соседей в топологии кольца) и сопрограмма для клиентской части (которая открывает соединение с другим соседом в кольце).Класс Node
может выглядеть следующим образом:
import asyncio
class Node:
...
async def run(self):
...
self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
server_coro = asyncio.create_task(self.server_init())
client_coro = asyncio.create_task(self.client_method())
await client_coro
await server_coro
...
async def server_init(self):
server = await asyncio.start_server(self.server_callback, self.IP, self.port)
async with server:
await server.serve_forever()
async def client_method(self):
...
try:
data = await self.next_reader.read()
except ConnectionError:
...
...
Обратите внимание, что я использую asyncio.create_task
для сопрограмм и (не здесь в списке кода) asyncio.run(node.run())
, которые считаются альтернативами высокого уровняasyncio.ensure_future()
и loop.run_forever()
.Оба из них были добавлены в Python 3.7, и asyncio.run()
, как говорят, является предварительным, так что к тому времени, когда вы прочитаете это, это уже могло быть заменено чем-то другим.
Я не эксперт AsyncIO,так что может быть лучший, более чистый способ сделать это (если вы знаете это, пожалуйста, поделитесь им).