Моя цель - написать простую комнату, где клиенты могут видеть друг друга и общаться друг с другом. Это без использования чего-либо, кроме aiort c и aiohttp.
Я пытаюсь написать модульный тест, то есть соединить двух пиров (мы можем назвать одного «предъявителя» и одного «зрителя»).
Я "просто" хочу подключить их в модульном тесте и отправить что-то от одного к другому. Видеопоток идеален. Для этого я использую пример FlagVideoStream.
Вот мой тестовый пример:
import pytest
import asyncio
import numpy
import cv2
from aiortc import (
RTCIceCandidate,
RTCPeerConnection,
RTCSessionDescription,
VideoStreamTrack,
)
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder
from aiortc.contrib.signaling import add_signaling_arguments, create_signaling
from rtc.rtcevent import RTCEvent, TcpSocketSignaling
from . flag_videostream_helper import FlagVideoStreamTrack
from .util import run, delay
class TestRTCEvent:
async def run_viewer(self, presenter, viewer, presenter_signaling, viewer_signalling):
await presenter_signaling.connect()
await viewer_signalling.connect()
await presenter.setLocalDescription(await presenter.createOffer())
res = asyncio.gather(presenter_signaling.send(presenter.localDescription),
delay(viewer_signalling.receive))
def test_can_presenter_make_offer_viewer_accept(self, rtc_presenter, rtc_viewer, loop):
@rtc_viewer.on("track")
def on_track(track):
print("Receiving %s" % track.kind)
recorder.addTrack(track)
presenter = RTCPeerConnection()
viewer = RTCPeerConnection()
presenter.addTrack(FlagVideoStreamTrack)
try:
presenter_signaling = TcpSocketSignaling("127.0.0.1",9001)
viewer_signaling = TcpSocketSignaling("127.0.0.1", 9002)
loop.run_until_complete(
self.run_viewer(presenter, viewer, presenter_signaling, viewer_signaling)
)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(presenter.close())
loop.run_until_complete(viewer.close())
loop.run_until_complete(presenter_signaling.close())
loop.run_until_complete(viewer_signaling.close())
Я понимаю, что мне нужно добавить трек к докладчику, прежде чем делать предложение, но тестирование дело просто выполняется без чего-либо особенного. В идеале, поскольку я не могу «увидеть» то, что видит зритель, я бы хотел передать этот поток на рекордер.
Это я пытался сделать с событием @ rtc_viewer.on ("track").
Пожалуйста, кто-нибудь может помочь с этим примером использования / теста? Немного ошеломляет, если не сказать больше о том, как установить соединение между двумя одноранговыми узлами таким образом, поскольку примеры включают в себя класс сигнализации копирования и вставки, но ни один из примеров не использует класс сигнализации TCP. Я нашел тестовый пример в тестовом коде, но он слишком прост c в том смысле, что он ничего не отправляет.