Я строю конвейер обработки сообщений и заметил, что, когда самый последний наблюдатель удаляет подписку, наблюдаемая все еще прокачивает данные.
Я посмотрел на документы Rx, и мое предположение, основанное на этом, заключалось в том, что RefCount () отключил бы наблюдаемое, как только последний наблюдатель отписался, согласно документам:
RefCount затем отслеживает, сколько других наблюдателей подписано на него, и не отключается от базового подключаемого Наблюдаемого , пока последний наблюдатель не сделал это .
Чтобы проиллюстрировать проблему, я создал очень минималистичный пример ниже:
class Program
{
static void Main(string[] args)
{
_ = SimulateObservableIssue();
Console.ReadKey();
}
public static async Task SimulateObservableIssue()
{
IObservable<int> source = Observable.Create<int>(async (observer) =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Source publishing {i}");
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observable is disposed"));
});
var multiSource = source.Publish().RefCount();
var subscription = multiSource.Subscribe(x => Console.WriteLine("Observer received: " + x));
await Task.Delay(3000);
subscription.Dispose();
Console.WriteLine("Subscription disposed");
}
}
и консольный вывод
Почему после 'subscription.Dispose ()' наблюдаемая все еще пытается произвести данные?
Спасибо!