У меня есть два сервера, созданные с помощью asyncio.start_server:
asyncio.start_server(self.handle_connection, host = host, port = port)
и работает в одном цикле:
loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()
Я использую asyncio.Queue для связи между серверами. Сообщения от Сервера2, добавленные через queue.put(msg)
, успешно принимаются queue.get()
на Сервере1. Я бегу queue.get()
на asyncio.ensure_future
и использую в качестве обратного вызова для
add_done_callback
метод с сервера1:
def callback(self, future):
msg = future.result()
self.msg = msg
Но этот callback
не работает должным образом - self.msg не обновляется. Что я делаю не так?
ОБНОВЛЕНО
с дополнительным кодом, чтобы показать максимально полный пример:
class Queue(object):
def __init__(self, loop, maxsize: int):
self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)
async def put(self, data):
await self.instance.put(data)
async def get(self):
data = await self.instance.get()
self.instance.task_done()
return data
@staticmethod
def get_instance():
return Queue(loop = asyncio.get_event_loop(), maxsize = 10)
Класс сервера:
class BaseServer(object):
def __init__(self, host, port):
self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
pass
def get_instance(self):
return self.instance
@staticmethod
def create():
return BaseServer(None, None)
Далее я запускаю серверы:
loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()
В handle_connection
на сервере2 я звоню queue.put(msg)
, в handle_connection
на сервере1 я зарегистрирован queue.get()
как задача:
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
Метод process_queue
сервера1:
def process_queue(self, future):
msg = future.result()
self.msg = msg
Метод handle_connection
для сервера1:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
while self.msg != SPECIAL_VALUE:
# doing something
Хотя task_queue
сделано, self.process_queue
вызвано, self.msg
никогда не обновляется.