Как я могу сделать следующее наблюдаемое повторение, пока stream.DataAvailable не станет ложным?В настоящее время это выглядит так, как будто оно никогда не останавливается.
AsyncReadChunk и Observable.Return внутри секции Defer вызывают OnNext, а затем OnCompleted.Когда Repeat получает вызов OnNext, он передает его в TakeWhile.Когда TakeWhile не удовлетворяется, он завершает наблюдаемое, но я думаю, что OnCompleted, который появляется сразу после OnNext, настолько быстр, что заставляет Repeat повторно подписаться на наблюдаемое и вызывает бесконечный цикл.
Как я могу исправитьтакое поведение?
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
return Observable.Defer(() =>
{
try
{
return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
}
catch (Exception)
{
return Observable.Return(new byte[0]);
}
})
.Repeat()
.TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}