Observable.FromAsyncPattern с UDPClient.EndReceive и ref параметром удаленной конечной точки - PullRequest
2 голосов
/ 13 декабря 2010

Я узнаю о Reactive расширениях и пытаюсь переформулировать часть моего кода.

UDPClient.EndReceive принимает параметр ref IPEndPoint, поэтому у меня в данный момент работает:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));

Что если моим подписчикам нужен доступ к удаленному IPEndPoint? В моем текущем воплощении я использую события и возвращаю пользовательский класс, который включает byte[] и IPEndPoint. Я не могу ради своей жизни понять, как это сделать с помощью Rx.

Ответы [ 2 ]

6 голосов
/ 14 декабря 2010

Если вы уже создали класс-оболочку для byte[] и IPEndPoint, почему бы не вернуть его как последовательность, используя Select:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}
3 голосов
/ 02 мая 2016

Для всех, кто ищет, есть немного более простой и современный способ сделать это, используя ReceiveAsync:

public static IObservable<UdpReceiveResult> UdpStream(IPEndPoint endpoint)
{
    return Observable.Using(() => new UdpClient(endpoint),
        udpClient => Observable.Defer(() =>
            udpClient.ReceiveAsync().ToObservable()).Repeat());
}

Вы можете позвонить с IPAddress.Any:

var stream = UdpStream(new IPEndPoint(IPAddress.Any, 514));

и затем используйте Select для проецирования потока на любой тип, который вы хотите.

...