Реактивные рамки устранить цикл while в Observable.Create - PullRequest
4 голосов
/ 14 мая 2011

Я работаю с наблюдаемым потоком байтов, выходящим из сети, и я хотел бы поднять это на один уровень абстракции. Формат имеет два байта, которые содержат длину следующего сообщения. Я хотел бы, чтобы это вписывалось в реактивную структуру очень хорошо. То, что я имею до сих пор, кажется не совсем правильным, поэтому мне интересно, какие уловки я мог пропустить, чтобы устранить здесь цикл while.

Вот концепция, которую я имею в виду:

public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
    return Observable.Create((IObserver<Stream>o)=>{
        bool done = false;
        var byteStream = byteStreamArg.Do(
            b => { }, (ex) => { done = true; }, () => { done = true; });
        while (!done)
        {
            var size = byteStream.Take(2).
                           Aggregate(0, (n, b) => (n << 8) + b).Single();
            var buf = byteStream.Skip(2).Take(size);
            var stream = new MemoryStream(buf.ToEnumerable().ToArray());
            if (!done)
            {
                o.OnNext(stream);
            }
        }
        return (() => {});
    });
}

1 Ответ

2 голосов
/ 14 мая 2011

IObservable здесь немного странный - помните, что вы возвращаете «Будущий список потоков» - я бы на самом деле просто возвратил Stream или, возможно, IObservable<byte[]>, где каждый массив представляет сообщение. Или сделай еще лучше и верни IObservable<ParsedMessage>

Кроме того, ваш цикл while делает это не асинхронным и ведет себя странно. Как насчет чего-то вроде этого:

public static IObservable<System.IO.Stream> GetToplevelStreams(IObservable<byte> byteStream)
{
    return Observable.Create((IObserver<System.IO.Stream> o) =>
    {
        int? size1=null;
        int? size=null;
        var buf = new MemoryStream();
        var subscription = byteStream.Subscribe(v =>
        {
            if (!size1.HasValue)
            {
                size1 = ((int)v) << 8;
            }
            else if (!size.HasValue)
            {
                size = size1.Value + v;
            }
            else
            {
                buf.WriteByte(v);
            }
            if (size.HasValue && buf.Length == size)
            {
                buf.Position = 0;
                o.OnNext(buf);
                buf.SetLength(0);
                size1 = null;
                size = null;
            }

        }, (ex)=>o.OnError(ex), ()=>o.OnCompleted());
        return () => subscription.Dispose();
    });
}
...