Быстрое повторение TakeWhile вызывает бесконечный цикл - PullRequest
6 голосов
/ 27 ноября 2011

Как я могу сделать следующее наблюдаемое повторение, пока stream.DataAvailable не станет ложным?В настоящее время это выглядит так, как будто оно никогда не останавливается.

AsyncReadChunk и Observable.Return внутри секции Defer вызывают OnNext, а затем OnCompleted.Когда Repeat получает вызов OnNext, он передает его в TakeWhile.Когда TakeWhile не удовлетворяется, он завершает наблюдаемое, но я думаю, что OnCompleted, который появляется сразу после OnNext, настолько быстр, что заставляет Repeat повторно подписаться на наблюдаемое и вызывает бесконечный цикл.

Как я могу исправитьтакое поведение?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
    return Observable.Defer(() =>
        {
            try
            {
                return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
            }
            catch (Exception)
            {
                return Observable.Return(new byte[0]);
            }
        })
        .Repeat()
        .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}

1 Ответ

2 голосов
/ 29 ноября 2011

ОТВЕТ СЕБЯ: (Ниже приведен ответ, опубликованный Саметом, автором вопроса. Однако он разместил ответ как часть вопроса. Я перевожу его в отдельный раздел.ответ, помеченный как вики сообщества, так как автор сам не переместил его.)


Я обнаружил путем рефакторинга, что это проблема с планировщиками.Return использует планировщик Immediate, а Repeat использует CurrentThread.Фиксированный код ниже.

    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Defer(() =>
                                    {
                                        try
                                        {
                                            return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                        catch (Exception)
                                        {
                                            return Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                    })
            .Repeat()
            .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
    }
...