Я прочитал как добавить сопрограмму в работающий асинхронный цикл? но это не так, как мне нужно
В основном мне нужен поток демона, чтобы подписаться на канал Redis, и я могу добавить динамический метод обратного вызова, мое решение - это создать подкласс класса Thread, создать цикл обработки событий и запускаться вечно, но после выполнения цикла я не могу вызвать любой метод объекта,
redis.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict
assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None
class RedisClient(Thread):
def __init__(self, loop):
super(RedisClient, self).__init__()
self.callbacks = defaultdict(list)
self.channels = {}
self.loop = loop
async def pubsub(self):
address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
self.sub = await aioredis.create_redis(address)
def sync_add_callback(self, channel, callback):
self.loop.create_task(self.add_callback(channel, callback))
async def add_callback(self, channel, callback):
self.callbacks[channel].append(callback)
if channel not in self.channels or self.channels[channel] is None:
channels = await self.sub.subscribe(channel)
ch1 = channels[0]
assert isinstance(ch1, aioredis.Channel)
self.channels[channel] = ch1
async def async_reader(channel):
while await channel.wait_message():
msg = await channel.get(encoding='utf-8')
# ... process message ...
print(msg)
print(channel.name)
for c in self.callbacks[channel.name.decode('utf-8')]:
c(channel.name, msg)
tsk1 = asyncio.ensure_future(async_reader(ch1))
def remove_callback(self, channel, callback):
self.callbacks[channel].remove(callback)
def run(self):
asyncio.set_event_loop(self.loop)
loop.run_until_complete(self.pubsub())
# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()
использование:
def test(channel, msg):
print('{}{}'.format(channel, msg))
from redis import redis_client
redis_client.sync_add_callback('test', test)
Может быть, мое решение не является хорошей практикой в Python?
Обновление 1:
Я попробовал решение, и оно работает хорошо, но вначале я хочу повторно использовать экземпляр sub
, этот метод может работать как модуль для подписки на другой канал, но каждая подписка должна иметь свою собственную sub
или то есть каждая подписка должна создать свое собственное соединение с Redis
решение:
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import asyncio
import aioredis
from threading import Thread
assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None
class RedisClient(Thread):
def __init__(self, channel, callback, *args, **kwargs):
super(RedisClient, self).__init__(*args, **kwargs)
self.daemon = True
self.channel = channel
self.callback = callback
async def pubsub(self):
address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
sub = await aioredis.create_redis(address)
channels = await sub.subscribe(self.channel)
ch1 = channels[0]
assert isinstance(ch1, aioredis.Channel)
async def async_reader(channel):
while await channel.wait_message():
msg = await channel.get(encoding='utf-8')
self.callback(channel.name.decode('utf-8'), msg)
await async_reader(ch1)
def run(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.pubsub())
обновление 2:
наконец, это хорошо работает
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
import os
import asyncio
import aioredis
from threading import Thread
from collections import defaultdict
assert os.getenv('REDIS_HOST') is not None
assert os.getenv('REDIS_PORT') is not None
class RedisClient(Thread):
def __init__(self, loop):
super(RedisClient, self).__init__()
self.callbacks = defaultdict(list)
self.channels = {}
self.loop = loop
self.sub = None
async def pubsub(self):
print('test3')
address = 'redis://{}:{}'.format(os.getenv('REDIS_HOST'), os.getenv('REDIS_PORT'))
self.sub = await aioredis.create_redis(address)
def sync_add_callback(self, channel, callback):
print('ahhhhhhhhh')
asyncio.run_coroutine_threadsafe(self.add_callback(channel, callback), self.loop)
async def add_callback(self, channel, callback):
print('test2')
if not self.sub:
await self.pubsub()
self.callbacks[channel].append(callback)
if channel not in self.channels or self.channels[channel] is None:
channels = await self.sub.subscribe(channel)
ch1 = channels[0]
assert isinstance(ch1, aioredis.Channel)
self.channels[channel] = ch1
async def async_reader(channel):
while await channel.wait_message():
msg = await channel.get(encoding='utf-8')
# ... process message ...
print(msg)
print(channel.name)
print(self.callbacks[channel.name])
for c in self.callbacks[channel.name.decode('utf-8')]:
c(channel.name, msg)
tsk1 = asyncio.ensure_future(async_reader(ch1))
def remove_callback(self, channel, callback):
self.callbacks[channel].remove(callback)
def run(self):
asyncio.set_event_loop(self.loop)
loop.run_forever()
# Create the new loop and worker thread
loop = asyncio.new_event_loop()
redis_client = RedisClient(loop)
redis_client.start()