Python3 asyncio - обратный вызов для add_done_callback не обновляет собственную переменную в классе сервера - PullRequest
0 голосов
/ 06 ноября 2018

У меня есть два сервера, созданные с помощью asyncio.start_server: asyncio.start_server(self.handle_connection, host = host, port = port) и работает в одном цикле:

loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()

Я использую asyncio.Queue для связи между серверами. Сообщения от Сервера2, добавленные через queue.put(msg), успешно принимаются queue.get() на Сервере1. Я бегу queue.get() на asyncio.ensure_future и использую в качестве обратного вызова для add_done_callback метод с сервера1:

def callback(self, future):
    msg = future.result()
    self.msg = msg

Но этот callback не работает должным образом - self.msg не обновляется. Что я делаю не так?

ОБНОВЛЕНО с дополнительным кодом, чтобы показать максимально полный пример:

class Queue(object):

    def __init__(self, loop, maxsize: int):
        self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)

    async def put(self, data):
        await self.instance.put(data)

    async def get(self):
        data = await self.instance.get()
        self.instance.task_done()
        return data

    @staticmethod
    def get_instance():
        return Queue(loop = asyncio.get_event_loop(), maxsize = 10)

Класс сервера:

    class BaseServer(object):

        def __init__(self, host, port):
            self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)

        async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
            pass

        def get_instance(self):
            return self.instance

        @staticmethod
        def create():
            return BaseServer(None, None)

Далее я запускаю серверы:

loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()

В handle_connection на сервере2 я звоню queue.put(msg), в handle_connection на сервере1 я зарегистрирован queue.get() как задача:

 task_queue = asyncio.ensure_future(queue.get())
 task_queue.add_done_callback(self.process_queue)

Метод process_queue сервера1:

    def process_queue(self, future):
        msg = future.result()
        self.msg = msg

Метод handle_connection для сервера1:

 async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
     task_queue = asyncio.ensure_future(queue.get())
     task_queue.add_done_callback(self.process_queue)

     while self.msg != SPECIAL_VALUE:
        # doing something

Хотя task_queue сделано, self.process_queue вызвано, self.msg никогда не обновляется.

1 Ответ

0 голосов
/ 06 ноября 2018

В принципе, поскольку вы используете асинхронную структуру, я думаю, что вы можете напрямую ожидать результата:

async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
    msg = await queue.get()
    process_queue(msg)  # change it to accept real value instead of a future.
    # do something
...