Как запустить asyn c сопрограммы из init, подождите, пока она не будет завершена - PullRequest
1 голос
/ 06 марта 2020

Я подключаюсь к aioredis с __init__ (я не хочу его переносить, так как это означает, что мне нужно внести некоторые существенные изменения). Как я могу дождаться подключения aioredis task в приведенном ниже __init__ примере кода и заставить его печатать self.sub и self.pub объект? В настоящее время выдается ошибка:

ab c .py: 42> exception = AttributeError ("У объекта 'S' нет атрибута 'pub'")

Я вижу, что соединения redis созданы, и coro create_connetion сделано.

Есть ли способ блокировать вызовы asyncio от __init__. Если я заменю asyncio.wait на asyncio.run_until_complete, я получаю сообщение об ошибке, которое примерно говорит о том, что

l oop уже запущено.

asyncio.gather равно

import sys, json
from addict import Dict
import asyncio
import aioredis

class S():
    def __init__(self, opts):
        print(asyncio.Task.all_tasks())
        task = asyncio.wait(asyncio.create_task(self.create_connection()), return_when="ALL_COMPLETED")
        print(asyncio.Task.all_tasks())
        print(task)
        print(self.pub, self.sub)

    async def receive_message(self, channel):
        while await channel.wait_message():
            message = await channel.get_json()
            await asyncio.create_task(self.callback_loop(Dict(json.loads(message))))

    async def run_s(self):
        asyncio.create_task(self.listen())
        async def callback_loop(msg):
            print(msg)

        self.callback_loop = callback_loop

    async def create_connection(self):
        self.pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.listener = await self.sub.subscribe(f"abc")

    async def listen(self):
        self.tsk = asyncio.ensure_future(self.receive_message(self.listener[0]))
        await self.tsk

async def periodic(): #test function to show current tasks
    number = 5
    while True:
        await asyncio.sleep(number)
        print(asyncio.Task.all_tasks())

async def main(opts):
    loop.create_task(periodic())
    s = S(opts)
    print(s.pub, s.sub)
    loop.create_task(s.run_s())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    main_task = loop.create_task(main(sys.argv[1:]))
    loop.run_forever() #I DONT WANT TO MOVE THIS UNLESS IT IS NECESSARY

1 Ответ

0 голосов
/ 06 марта 2020

Я думаю, что вы хотите сделать, чтобы убедиться, что функция create_connections выполняется до завершения до конструктора S. Один из способов сделать это - немного изменить ваш код. Переместите функцию create_connections за пределы класса:

async def create_connection():
    pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    listener = await self.sub.subscribe(f"abc")
    return pub, sub, db, listener

Теперь дождитесь этой функции перед построением S. Поэтому ваша основная функция становится такой:

async def main(opts):
    loop.create_task(periodic())
    x = await create_connections()
    s = S(opts, x)  # pass the result of create_connections to S
    print(s.pub, s.sub)
    loop.create_task(s.run_s())

Теперь измените конструктор S, чтобы получать созданные объекты. :

def __init__(self, opts, x):
    self.pub, self.sub, self.db, self.listener = x

Я не уверен, что вы пытаетесь сделать с аргументом return_when и вызовом asyncio.wait. Функция create_connections не запускает набор параллельных задач, а ожидает каждого вызова, прежде чем перейти к следующему. Возможно, вы могли бы улучшить производительность, выполнив четыре вызова параллельно, но это другой вопрос.

...