Как добавить сопрограмму для запуска цикла событий? - PullRequest
0 голосов
/ 11 ноября 2018

Я прочитал как добавить сопрограмму в работающий асинхронный цикл? но это не так, как мне нужно

В основном мне нужен поток демона, чтобы подписаться на канал Redis, и я могу добавить динамический метод обратного вызова, мое решение - это создать подкласс класса Thread, создать цикл обработки событий и запускаться вечно, но после выполнения цикла я не могу вызвать любой метод объекта,

redis.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, loop):
        super(RedisClient, self).__init__()
        self.callbacks = defaultdict(list)
        self.channels = {}
        self.loop = loop

    async def pubsub(self):
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        self.sub = await aioredis.create_redis(address)

    def sync_add_callback(self, channel, callback):
        self.loop.create_task(self.add_callback(channel, callback))

    async def add_callback(self, channel, callback):
        self.callbacks[channel].append(callback)

        if channel not in self.channels or self.channels[channel] is None:
            channels = await self.sub.subscribe(channel)
            ch1 = channels[0]
            assert isinstance(ch1, aioredis.Channel)
            self.channels[channel] = ch1

            async def async_reader(channel):
                while await channel.wait_message():
                    msg = await channel.get(encoding='utf-8')
                    # ... process message ...
                    print(msg)
                    print(channel.name)
                    for c in self.callbacks[channel.name.decode('utf-8')]:
                        c(channel.name, msg)

            tsk1 = asyncio.ensure_future(async_reader(ch1))

    def remove_callback(self, channel, callback):
        self.callbacks[channel].remove(callback)

    def run(self):
        asyncio.set_event_loop(self.loop)
        loop.run_until_complete(self.pubsub())


# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()

использование:

def test(channel, msg):
    print('{}{}'.format(channel, msg))

from redis import redis_client
redis_client.sync_add_callback('test', test)

Может быть, мое решение не является хорошей практикой в ​​Python?

Обновление 1:

Я попробовал решение, и оно работает хорошо, но вначале я хочу повторно использовать экземпляр sub, этот метод может работать как модуль для подписки на другой канал, но каждая подписка должна иметь свою собственную sub или то есть каждая подписка должна создать свое собственное соединение с Redis

решение:

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import asyncio
import aioredis
from threading import Thread

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, channel, callback, *args, **kwargs):
        super(RedisClient, self).__init__(*args, **kwargs)
        self.daemon = True
        self.channel = channel
        self.callback = callback

    async def pubsub(self):
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        sub = await aioredis.create_redis(address)

        channels = await sub.subscribe(self.channel)
        ch1 = channels[0]
        assert isinstance(ch1, aioredis.Channel)

        async def async_reader(channel):
            while await channel.wait_message():
                msg = await channel.get(encoding='utf-8')
                self.callback(channel.name.decode('utf-8'), msg)

        await async_reader(ch1)

    def run(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.pubsub())

обновление 2:

наконец, это хорошо работает

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict

assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None

class RedisClient(Thread):
    def __init__(self, loop):
        super(RedisClient, self).__init__()
        self.callbacks = defaultdict(list)
        self.channels = {}
        self.loop = loop
        self.sub = None

    async def pubsub(self):
        print('test3')
        address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
        self.sub = await aioredis.create_redis(address)

    def sync_add_callback(self, channel, callback):
        print('ahhhhhhhhh')
        asyncio.run_coroutine_threadsafe(self.add_callback(channel, callback), self.loop)

    async def add_callback(self, channel, callback):
        print('test2')
        if not self.sub:
            await self.pubsub()
        self.callbacks[channel].append(callback)

        if channel not in self.channels or self.channels[channel] is None:
            channels = await self.sub.subscribe(channel)
            ch1 = channels[0]
            assert isinstance(ch1, aioredis.Channel)
            self.channels[channel] = ch1

            async def async_reader(channel):
                while await channel.wait_message():
                    msg = await channel.get(encoding='utf-8')
                    # ... process message ...
                    print(msg)
                    print(channel.name)
                    print(self.callbacks[channel.name])
                    for c in self.callbacks[channel.name.decode('utf-8')]:
                        c(channel.name, msg)

            tsk1 = asyncio.ensure_future(async_reader(ch1))

    def remove_callback(self, channel, callback):
        self.callbacks[channel].remove(callback)

    def run(self):
        asyncio.set_event_loop(self.loop)
        loop.run_forever()


# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018

Если идея заключается в том, чтобы sync_add_callback вызывался из других потоков, то его реализация должна выглядеть следующим образом:

def sync_add_callback(self, channel, callback):
    asyncio.run_coroutine_threadsafe(self.add_callback(channel, callback), self.loop)

Обратите внимание, что обратные вызовы будут вызываться в потоке цикла событий, поэтому они не должны сами использовать блокирующие вызовы.

0 голосов
/ 12 ноября 2018

позвольте мне показать вам аналогичный случай, используя aiohttp здесь.

async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        await sub.unsubscribe(ch.name)
        await sub.quit()


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    app['redis_listener'].cancel()
    await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...