Мне нужно отложить отправку каналов сообщения. Вот мой код:
# consumers.py
class ChatConsumer(WebsocketConsumer):
def chat_message(self, event):
self.send(text_data=json.dumps(event['message']))
def connect(self):
self.channel_layer.group_add(self.room_name, self.channel_name)
self.accept()
def receive(self, text_data=None, bytes_data=None):
send_message_task.apply_async(
args=(
self.room_name,
{'type': 'chat_message',
'message': 'the message'}
),
countdown=10
)
# tasks.py
@shared_task
def send_message_task(room_name, message):
layer = get_channel_layer()
layer.group_send(room_name, message)
Задача выполняется, и я не вижу никаких ошибок, но сообщение не отправляется. Это работает, только если я отправляю его из метода класса потребителя.
Я также пытался использовать AsyncWebsocketConsumer и отправлять с AsyncToSync (layer.group_send). Он выдает ошибку: «Вы не можете использовать AsyncToSync в том же потоке, что и цикл асинхронных событий - просто дождитесь асинхронной функции напрямую».
Затем я попытался объявить send_message_task как асинхронный и использовать await. Ничего не происходит снова (без ошибок), и я не уверен, что задача выполнена вообще.
Вот версии:
Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1
Настройки:
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
},
},
}
Есть идеи?
UPD: Только что обнаружил, что уровень канала Redis восстановлен, но его метод group_send не вызывается и просто пропускается.
UPD 2: Отправка с использованием AsyncToSync(layer.group_send)
из консоли работает. Вызов задачи без apply_async
также работает. Но запуск с apply_async
вызывает ошибку You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly
. Определение задачи как асинхронной и использование await
также, конечно, ломает все.