Следующий код работает, как и ожидалось, но я озадачен тем, как он себя ведет, когда я раскомментирую строку 'o.OnCompleted ();'
Код объединяет всех подписчиков в результате одной длинной операциии кэширует результат для дальнейших подписчиков в течение 2 секунд.Любая подписка после этого времени снова запускает процесс.
Подписки будут поступать из других потоков (смоделированных с пулом потоков).
var obs = Observable.Create((IObserver<Guid> o) =>
{
Console.WriteLine("Start");
Thread.Sleep(1000); // process
Console.WriteLine("End");
o.OnNext(Guid.NewGuid());
//o.OnCompleted(); // <-- uncomment this
return Disposable.Empty;
})
.Replay(TimeSpan.FromSeconds(2))
.RefCount()
.Take(1);
ThreadPool.QueueUserWorkItem(delegate
{
// simulate request from threadpool
obs.Subscribe(x => Console.WriteLine($"1: {x}"), () => Console.WriteLine($"1: complete"));
});
ThreadPool.QueueUserWorkItem(delegate
{
obs.Subscribe(x => Console.WriteLine($"2: {x}"), () => Console.WriteLine($"2: complete"));
});
Thread.Sleep(4000);
ThreadPool.QueueUserWorkItem(delegate
{
obs.Subscribe(x => Console.WriteLine($"3: {x}"), () => Console.WriteLine($"3: complete"));
});
Вот результат:
Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: 1214DC63-F688-475A-9CB7-C3784054A4AC
3: complete
Странное поведение, если я раскомментирую строку 'o.OnCompleted ()', результат изменится на следующий:
Start
End
1: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
2: 255BEFDC-2F14-40AD-AE77-2B005C5A3AA9
1: complete
2: complete
Start
End
3: complete
Третий подписчик вызывает другую подписку на наблюдаемый корень, но результат отсутствует.Похоже, что ReplaySubject кэширует результат предыдущей наблюдаемой, завершенной, но все еще вызывает новую подписку.Это кажется не интуитивным.Я хотел бы понять, почему это не работает.
Примечание. Первоначально я попытался сделать это, используя Defer вместо Create, который имел тот же результат, что и второй запуск выше (по очевидным причинам).