Использование System.Reactive десериализации сообщений - PullRequest
2 голосов
/ 24 февраля 2012

В настоящее время у меня есть программа, которая прослушивает сетевой поток и генерирует события, когда новое сообщение было десериализовано.

while(true)
{
  byte[] lengthBytes = new byte[10];
  networkStream.Read(lengthBytes, 0, 10);
  int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
  var messageBytes = new byte[messageLength + 10];
  Array.Copy(lengthBytes, messageBytes, 10);
  int bytesReadTotal = 10;
  while (bytesReadTotal < 10 + messageLength)
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
  OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}

Я хочу переписать это, используя реактивные расширения, чтобы вместо события было IObservable<Message>.Это можно сделать с помощью

Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
  (h) => NewMessage += h,
  (h) => NewMessage -= h)
    .Select(  (e) => { return e.Message; });

Однако я бы предпочел переписать процесс прослушивания, используя System.Reactive вместо этого.Моя отправная точка (от здесь ) -

Func<byte[], int, int, IObservable<int>> read;   
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);

, которая позволяет

byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
  (bytesRead) => ;
});

Я изо всех сил пытаюсь понять, как продолжить.У кого-нибудь есть реализация?

Ответы [ 2 ]

1 голос
/ 24 февраля 2012

Я придумал следующее, но я чувствую, что это возможно без создания класса и использования Subject<T> (например, посредством некоторой проекции пакета заголовка на пакет тела на объект сообщения, но проблема в том, чтоEndRead() не возвращает байтовый массив, но количество прочитанных байтов. Поэтому вам нужен объект или, по крайней мере, замыкание в какой-то момент).

class Message
{
    public string Text { get; set; }
}

class MessageStream : IObservable<Message>
{
    private readonly Subject<Message> messages = new Subject<Message>();

    public void Start()
    {
        // Get your real network stream here.
        var stream  = Console.OpenStandardInput();
        GetNextMessage( stream );
    }

    private void GetNextMessage(Stream stream)
    {
        var header = new byte[10];
        var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
        read( header, 0, 10 ).Subscribe( b =>
        {
            var bodyLength = BitConverter.ToInt32( header, 0 );
            var body = new byte[bodyLength];
            read( body, 0, bodyLength ).Subscribe( b2 =>
            {
                var message = new Message() {Text = Encoding.UTF8.GetString( body )};
                messages.OnNext( message );
                GetNextMessage( stream );
            } );
        } );
    }

    public IDisposable Subscribe( IObserver<Message> observer )
    {
        return messages.Subscribe( observer );
    }
}
0 голосов
/ 25 февраля 2012

Поскольку Observable.FromAsyncPattern выполняет асинхронный вызов только один раз, вам нужно создать функцию, которая будет вызывать ее несколько раз.Это должно помочь вам начать, но, вероятно, есть много возможностей для улучшения.Предполагается, что вы можете выполнять асинхронные вызовы несколько раз с одними и теми же аргументами, и предполагает, что selector будет обрабатывать любые проблемы, возникающие из этого.Итак:

Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
                 AddressOf stream.BeginRead, AddressOf stream.EndRead,
                 Function(numRead)
                     If numRead < 0 Then Throw New ArgumentException("Invalid number read")
                     Console.WriteLine("Position after read: " & stream.Position.ToString())
                     Dim ret(numRead - 1) As Byte
                     Array.Copy(buffer, ret, numRead)
                     Return ret
                 End Function,
                 Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)

Оттуда вам понадобится какая-то функция-накопитель, которая принимает байтовые массивы и выводит полные сообщения, когда они найдены.Скелет такой функции может выглядеть так:

Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
    Return Observable.Create(Of message)(
        Function(obs)
            Dim accumulator As New List(Of Byte)
            Return source.Subscribe(
                Sub(buffer)
                    'do some logic to build a packet here
                    accumulator.AddRange(buffer)
                    If True Then
                        obs.OnNext(New message())
                        'reset accumulator
                    End If
                End Sub,
                AddressOf obs.OnError,
                AddressOf obs.OnCompleted)
        End Function)
End Function
...