В настоящее время у меня есть программа, которая прослушивает сетевой поток и генерирует события, когда новое сообщение было десериализовано.
while(true)
{
byte[] lengthBytes = new byte[10];
networkStream.Read(lengthBytes, 0, 10);
int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
var messageBytes = new byte[messageLength + 10];
Array.Copy(lengthBytes, messageBytes, 10);
int bytesReadTotal = 10;
while (bytesReadTotal < 10 + messageLength)
bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}
Я хочу переписать это, используя реактивные расширения, чтобы вместо события было IObservable<Message>
.Это можно сделать с помощью
Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
(h) => NewMessage += h,
(h) => NewMessage -= h)
.Select( (e) => { return e.Message; });
Однако я бы предпочел переписать процесс прослушивания, используя System.Reactive вместо этого.Моя отправная точка (от здесь ) -
Func<byte[], int, int, IObservable<int>> read;
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);
, которая позволяет
byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
(bytesRead) => ;
});
Я изо всех сил пытаюсь понять, как продолжить.У кого-нибудь есть реализация?