Я работаю с наблюдаемым потоком байтов, выходящим из сети, и я хотел бы поднять это на один уровень абстракции. Формат имеет два байта, которые содержат длину следующего сообщения. Я хотел бы, чтобы это вписывалось в реактивную структуру очень хорошо. То, что я имею до сих пор, кажется не совсем правильным, поэтому мне интересно, какие уловки я мог пропустить, чтобы устранить здесь цикл while.
Вот концепция, которую я имею в виду:
public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
return Observable.Create((IObserver<Stream>o)=>{
bool done = false;
var byteStream = byteStreamArg.Do(
b => { }, (ex) => { done = true; }, () => { done = true; });
while (!done)
{
var size = byteStream.Take(2).
Aggregate(0, (n, b) => (n << 8) + b).Single();
var buf = byteStream.Skip(2).Take(size);
var stream = new MemoryStream(buf.ToEnumerable().ToArray());
if (!done)
{
o.OnNext(stream);
}
}
return (() => {});
});
}