Учитывая Django Потребителя каналов, который выглядит следующим образом:
class NotificationConsumer(JsonWebsocketConsumer):
def connect(self):
user = self.scope["user"]
async_to_sync(self.channel_layer.group_add)("hello", "hello")
self.accept()
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
def receive_json(self, content, **kwargs):
print(content)
async_to_sync(self.channel_layer.group_send)(
"hello", {"type": "chat.message", "content": "hello"}
)
print("Here we are")
def chat_message(self, event):
self.send_json(content=event["content"])
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)("hello", "hello")
и тест, который выглядит следующим образом:
@pytest.mark.asyncio
class TestWebsockets:
async def test_receives_data(self, settings):
communicator = WebsocketCommunicator(
application=application, path="/ws/notifications/"
)
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to({"type": "notify", "data": "who knows"})
response = await communicator.receive_json_from()
await communicator.disconnect()
Я всегда получаю TimeoutError
когда я запускаю тест. Что мне нужно сделать по-другому?
Если вы хотите увидеть пример полного репо, посмотрите https://github.com/phildini/websockets-test