Пытаюсь осмыслить поток сообщений с помощью django каналов - PullRequest
2 голосов
/ 26 мая 2020

Пытаюсь осознать Django каналов. Я совершенно новичок в программировании asyn c и пытаюсь понять, почему мой код ведет себя так.

В настоящее время я создаю приложение, используя Django каналов, в настоящее время использую канал в памяти Layer в settings.py :

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"
    }
}

Я пытаюсь запустить длительную задачу через веб-сокет и хочу, чтобы потребитель отправлял клиенту периодические c обновления.

Пример кода:

import time
from asgiref.sync import async_to_sync
from channels.generic.websocket import JsonWebsocketConsumer

class Consumer(JsonWebsocketConsumer):

    def connect(self):
        print("connected to consumer")
        async_to_sync(self.channel_layer.group_add)(
            f'consumer_group',
            self.channel_name
        )
        self.accept()

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)(
            'consumer_group',
            self.channel_name
        )
        self.close()

    def long_running_thing(self, event):

        for i in range(5):
            time.sleep(0.2)
            async_to_sync(self.channel_layer.group_send)(
                'consumer_group',
                {
                    "type": "log.progress",
                    "data": i
                }
            )
            print("long_running_thing", i)

    def log_progress(self, event):
        print("log_progress", event['data'])

    def receive_json(self, content, **kwargs):
        print(f"Received event: {content}")
        if content['action'] == "start_long_running_thing":
            async_to_sync(self.channel_layer.group_send)(
                'consumer_group',
                {
                    "type": "long.running.thing",
                    "data": content['data']
                }
            )

Потребитель запускает long_running_thing, как только он получает правильное действие. Однако вызовы log_progress происходят после завершения long_running_thing.

Вывод:

Received event: {'action': 'start_long_running_thing', 'data': {}}
long_running_thing 0
long_running_thing 1
long_running_thing 2
long_running_thing 3
long_running_thing 4
log_progress 0
log_progress 1
log_progress 2
log_progress 3
log_progress 4

Может ли кто-нибудь объяснить мне, почему это так и как Я могу регистрировать ход выполнения?

Изменить : добавлено routing.py и часть JavaScript.

from django.urls import re_path

from sockets import consumers

websocket_urlpatterns = [
    re_path(r'$', consumers.Consumer),
]

Сейчас я использую vue. js с vue -native-websocket, это соответствующая часть во внешнем интерфейсе.

const actions = {
  startLongRunningThing(context){
    const message = {
      action: "start_long_running_thing",
      data: {}
    }
    Vue.prototype.$socket.send(JSON.stringify(message))
}

1 Ответ

0 голосов
/ 26 мая 2020

Я также начинаю с программирования asyn c, но я предлагаю вам использовать вместо AsyncJsonWebsocketConsumer , а после отправки событий через channel_layer используйте send_json функция:

import asyncio
import json
from channels.generic.websocket import AsyncJsonWebsocketConsumer


class Consumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        print("connected to consumer")
        await self.channel_layer.group_add(
            f'consumer_group',
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            'consumer_group',
            self.channel_name
        )
        await self.close()

    async def task(self):
        for i in range(5):
            print("long_running_thing: ", i)
            await asyncio.sleep(i)
            print("log_progress: ", i)
            await self.send_json({
                'log_progress': i
            })

    async def long_running_thing(self, event):
        loop = asyncio.get_event_loop()
        task = loop.create_task(self.task())
        await task


    async def receive_json(self, content, **kwargs):
        print(f"Received event: {content}")
        if content['action'] == "start_long_running_thing":
            await self.channel_layer.group_send(
                'consumer_group',
                {
                    "type": "long.running.thing",
                    "data": content['data']
                }
            )

Вывод:

INFO: ('127.0.0.1', ) - "WebSocket /" [accepted]
Received event: {'action': 'start_long_running_thing', 'data': {}}
long_running_thing:  0
log_progress:  0
long_running_thing:  1
log_progress:  1
long_running_thing:  2
log_progress:  2
long_running_thing:  3
log_progress:  3
long_running_thing:  4
log_progress:  4
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...