Невозможно получить тесты websockets для прохождения по Django каналам - PullRequest
0 голосов
/ 04 мая 2020

Учитывая Django Потребителя каналов, который выглядит следующим образом:

class NotificationConsumer(JsonWebsocketConsumer):
    def connect(self):
        user = self.scope["user"]
        async_to_sync(self.channel_layer.group_add)("hello", "hello")
        self.accept()
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )

    def receive_json(self, content, **kwargs):
        print(content)
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )
        print("Here we are")

    def chat_message(self, event):
        self.send_json(content=event["content"])

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)("hello", "hello")

и тест, который выглядит следующим образом:

@pytest.mark.asyncio
class TestWebsockets:

    async def test_receives_data(self, settings):

        communicator = WebsocketCommunicator(
            application=application, path="/ws/notifications/"
        )
        connected, _ = await communicator.connect()
        assert connected
        await communicator.send_json_to({"type": "notify", "data": "who knows"})
        response = await communicator.receive_json_from()
        await communicator.disconnect()

Я всегда получаю TimeoutError когда я запускаю тест. Что мне нужно сделать по-другому?

Если вы хотите увидеть пример полного репо, посмотрите https://github.com/phildini/websockets-test

Ответы [ 2 ]

2 голосов
/ 05 мая 2020

не должно async_to_sync(self.channel_layer.group_add)("hello", "hello") быть async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)? В первом случае вы добавляете «hello» в группу, и communicator.receive_json_from() в тесте завершится неудачей, так как group_send не будет получен тестовым клиентом.

Путем рефакторинга класса как:

class NotificationConsumer(JsonWebsocketConsumer):
    def connect(self):
        user = self.scope["user"]
        async_to_sync(self.channel_layer.group_add)("hello", self.channel_name)
        self.accept()
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )

    def receive_json(self, content, **kwargs):
        print(content)
        async_to_sync(self.channel_layer.group_send)(
            "hello", {"type": "chat.message", "content": "hello"}
        )
        print("Here we are")

    def chat_message(self, event):
        self.send_json(content=event["content"])

    def disconnect(self, close_code):
        # Leave room group
        async_to_sync(self.channel_layer.group_discard)("hello", self.channel_name)

Я могу получить тесты из образца репо

0 голосов
/ 04 мая 2020

Для тестирования кода asyn c каналов лучше всего использовать чисто функциональные тесты asyn c.

@pytest.mark.asyncio
async def test_receives_data(settings):
    communicator = WebsocketCommunicator(
        application=application, path="/ws/notifications/"
    )
    connected, _ = await communicator.connect()
    assert connected
    await communicator.send_json_to({"type": "notify", "data": "who knows"})
    response = await communicator.receive_json_from()
    await communicator.disconnect()

pytest позволит вам смешивать тезисы с class регулярными Django тестами.

Здесь вы можете найти несколько примеров тестирования потребителей.

https://github.com/hishnash/djangochannelsrestframework/tree/master/tests

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...