Как написать юнит-тесты для потребителей в django каналах? - PullRequest
0 голосов
/ 08 февраля 2020

Я новичок в django-channels. Я использую channels 2.3.1. У меня есть следующий код.

основная маршрутизация

websocket_urlpatterns = [
    url(r'^ws/events/(?P<room_type>[^/]+)/(?P<room_id>[^/]+)/$', consumers.Consumer, name='core-consumer'),
]

маршрутизация приложения

application = ProtocolTypeRouter({
    'websocket': URLRouter(core.routing.websocket_urlpatterns),
})

потребитель

class Consumer(AsyncWebsocketConsumer):
    async def connect(self):
        room_type = self.scope['url_route']['kwargs']['room_type']
        room_id = self.scope['url_route']['kwargs']['room_id']
        self.room_group_name = f'{room_type}_{room_id}'

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

    async def receive(self, text_data = None, bytes_data = None):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        await self.channel_layer.group_send(self.room_group_name,
            {'type': 'send_data', 'message': message}
        )

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    async def send_data(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({
            'data': message
        }))

def send_event(channel='', message=''):
    channel_layer = get_channel_layer()
    asyncio.run(channel_layer.group_send(channel,
        {
            'type': 'send_data',
            'message': message
        }
    ))

сигнал

    def activate_notification(self):
        send_event(f'user_{self.id}',
                   {'message': f"Welcome back to one deeds!"})

Как проверить каждую функцию внутри consumer, используя unittest?

I прочитайте документацию , но ничего не понимаете (

Я хочу запустить тест с помощью команды ./manage.py test

Я пытался протестировать это с WebsocketCommunicator

1 Ответ

1 голос
/ 11 февраля 2020

Я бы предложил несколько вещей.

1) используйте слой channels.layers.InMemoryChannelLayer.

2) используйте pytest-asyncio, чтобы ваш тестовый код мог быть async методами

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_get():
   ....

для запуска этих тестов вы будете использовать pytest

3) используйте WebsocketCommunicator, проверьте эти примеры .

...