Повторите тему подписки поведения - PullRequest
0 голосов
/ 26 сентября 2018

Следующий код работает, как и ожидалось, но я озадачен тем, как он себя ведет, когда я раскомментирую строку 'o.OnCompleted ();'

Код объединяет всех подписчиков в результате одной длинной операциии кэширует результат для дальнейших подписчиков в течение 2 секунд.Любая подписка после этого времени снова запускает процесс.

Подписки будут поступать из других потоков (смоделированных с пулом потоков).

        var obs = Observable.Create((IObserver<Guid> o) =>
        {
            Console.WriteLine("Start");
            Thread.Sleep(1000); // process
            Console.WriteLine("End");
            o.OnNext(Guid.NewGuid());
            //o.OnCompleted(); // <-- uncomment this
            return Disposable.Empty;
        })
        .Replay(TimeSpan.FromSeconds(2))
        .RefCount()
        .Take(1);

        ThreadPool.QueueUserWorkItem(delegate
        {
            // simulate request from threadpool
            obs.Subscribe(x => Console.WriteLine($"1: {x}"), () => Console.WriteLine($"1: complete"));
        });

        ThreadPool.QueueUserWorkItem(delegate
        {
            obs.Subscribe(x => Console.WriteLine($"2: {x}"), () => Console.WriteLine($"2: complete"));
        });

        Thread.Sleep(4000);

        ThreadPool.QueueUserWorkItem(delegate
        {
            obs.Subscribe(x => Console.WriteLine($"3: {x}"), () => Console.WriteLine($"3: complete"));
        });

Вот результат:

Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: 1214DC63-F688-475A-9CB7-C3784054A4AC
3: complete

Странное поведение, если я раскомментирую строку 'o.OnCompleted ()', результат изменится на следующий:

Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: complete

Третий подписчик вызывает другую подписку на наблюдаемый корень, но результат отсутствует.Похоже, что ReplaySubject кэширует результат предыдущей наблюдаемой, завершенной, но все еще вызывает новую подписку.Это кажется не интуитивным.Я хотел бы понять, почему это не работает.

Примечание. Первоначально я попытался сделать это, используя Defer вместо Create, который имел тот же результат, что и второй запуск выше (по очевидным причинам).

1 Ответ

0 голосов
/ 26 сентября 2018

Когда вы используете пару Replay / RefCount, вы создаете наблюдаемую, которая разделяет общую подписку на исходную наблюдаемую.

Из источника:

Возвращаетподключаемая наблюдаемая последовательность, которая разделяет одну подписку на базовую последовательность, воспроизводящую все уведомления.

Теперь важно помнить, что наблюдаемая дает серию из нуля или более значений, за которыми следует либо полное, либо ошибкасигнал.Он не может создавать значения после завершения или возникновения ошибки.

Так как вы делитесь общей подпиской с источником, и если ваш источник создает завершение, он не может создать больше значений.Поэтому, когда вы звоните o.OnCompleted(), вы делаете именно это.

Кроме того, как примечание, вы должны избегать написания return Disposable.Empty; внутри Create.Это означает, что вы создаете наблюдаемое, чем можете завершить до возвращения подписки, и это может привести к условиям гонки.

Способ написания кода без него:

var obs =
    Observable
        .Defer(() => Observable.Return(Guid.NewGuid()).Concat(Observable.Never<Guid>()))
        .Replay(TimeSpan.FromSeconds(2.0))
        .RefCount()
        .Take(1);

Ноэто все равно что не звонить o.OnCompleted().

...