Как отправить групповое сообщение каналов 2.x из задачи django-celery 3? - PullRequest
0 голосов
/ 25 июня 2018

Мне нужно отложить отправку каналов сообщения. Вот мой код:

# consumers.py
class ChatConsumer(WebsocketConsumer):
    def chat_message(self, event):
        self.send(text_data=json.dumps(event['message']))

    def connect(self):
        self.channel_layer.group_add(self.room_name, self.channel_name)
        self.accept()

    def receive(self, text_data=None, bytes_data=None):
        send_message_task.apply_async(
            args=(
                self.room_name,
                {'type': 'chat_message',
                 'message': 'the message'}
            ),
            countdown=10
        )

# tasks.py
@shared_task
def send_message_task(room_name, message):
    layer = get_channel_layer()
    layer.group_send(room_name, message)

Задача выполняется, и я не вижу никаких ошибок, но сообщение не отправляется. Это работает, только если я отправляю его из метода класса потребителя.

Я также пытался использовать AsyncWebsocketConsumer и отправлять с AsyncToSync (layer.group_send). Он выдает ошибку: «Вы не можете использовать AsyncToSync в том же потоке, что и цикл асинхронных событий - просто дождитесь асинхронной функции напрямую».

Затем я попытался объявить send_message_task как асинхронный и использовать await. Ничего не происходит снова (без ошибок), и я не уверен, что задача выполнена вообще.

Вот версии:

Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1

Настройки:

REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
        },
    },
}

Есть идеи?

UPD: Только что обнаружил, что уровень канала Redis восстановлен, но его метод group_send не вызывается и просто пропускается.

UPD 2: Отправка с использованием AsyncToSync(layer.group_send) из консоли работает. Вызов задачи без apply_async также работает. Но запуск с apply_async вызывает ошибку You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. Определение задачи как асинхронной и использование await также, конечно, ломает все.

Ответы [ 4 ]

0 голосов
/ 29 ноября 2018

Возможно, это не прямой ответ на начальный вопрос, но это может помочь.Если вы получаете исключение «Вы не можете использовать AsyncToSync в том же потоке, что и цикл асинхронных событий - просто дождитесь асинхронной функции напрямую», тогда вы, вероятно, сделаете что-то из этого:

  1. цикл событий создан где-то
  2. некоторый код ASYNC запущен
  3. некоторый код SYNC вызывается из кода ASYNC
  4. Код SYNC пытается вызвать код ASYNC с помощью AsyncToSync, который предотвращает это

Кажется, что AsyncToSync обнаруживает внешний цикл событий и принимает решение не вмешиваться в него.

Решение состоит в том, чтобы напрямую включить ваш асинхронный вызов во внешний цикл событий.Пример кода приведен ниже, но лучше всего проверить вашу ситуацию и работает ли внешний цикл ...

loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))
0 голосов
/ 26 июня 2018

Вам понадобится обертка async_to_sync() для соединения при использовании канальных слоев, потому что все методы канального уровня являются асинхронными.

def connect(self):
    async_to_sync(self.channel_layer.group_add(
        self.room_name, self.channel_name)
    self.accept()

То же самое относится к отправке сообщения из вашего задания по сельдерею.

@shared_task
def send_message_task(room_name, message):
    channel_layer = get_channel_layer()

    async_to_sync(channel_layer.group_send)(
        room_name,
        {'type': 'chat_message', 'message': message}
    )

Также вы можете просто вызвать ваше задание по сельдерею из receive() вашего потребителя, например:

send_message_task.delay(self.room_name, 'your message here')

Что касается ошибки AsyncToSync, вам необходимо обновить каналы и установить новую версию, как указано в этой теме .

0 голосов
/ 27 июня 2018

Я нашел уродливое и неэффективное решение, но оно работает:

@shared_task
def send_message_task(room_name, message):
    def sender(room_name, message):
        channel_layer = get_channel_layer()

        AsyncToSync(channel_layer.group_send)(
            room_name,
            {'type': 'chat_message', 'message': message}
        )

    thread = threading.Thread(target=sender, args=(room_name, message,))
    thread.start()
    thread.join()

Если кто-то сможет его улучшить, я буду признателен.

0 голосов
/ 25 июня 2018

Проблема в вашем коде в том, что вы использовали underscore в своем типе chat_message.Я полагаю, что вы пропустили это в документации:

Имя метода будет типом события с периодами, замененными подчеркиваниями - так, например, событие, поступающее через слой канала стип chat.join будет обрабатываться методом chat_join.

Так что в вашем случае тип будет chat.message

{
    'type': 'chat.message',
    'message': 'the message'
}
...