Наблюдаемый из RefCount () не прекращает публикацию - PullRequest
0 голосов
/ 20 апреля 2019

Я строю конвейер обработки сообщений и заметил, что, когда самый последний наблюдатель удаляет подписку, наблюдаемая все еще прокачивает данные.

Я посмотрел на документы Rx, и мое предположение, основанное на этом, заключалось в том, что RefCount () отключил бы наблюдаемое, как только последний наблюдатель отписался, согласно документам:

RefCount затем отслеживает, сколько других наблюдателей подписано на него, и не отключается от базового подключаемого Наблюдаемого , пока последний наблюдатель не сделал это .

Чтобы проиллюстрировать проблему, я создал очень минималистичный пример ниже:

class Program
{
    static void Main(string[] args)
    {
        _ = SimulateObservableIssue();

        Console.ReadKey();
    }

    public static async Task SimulateObservableIssue()
    {
        IObservable<int> source = Observable.Create<int>(async (observer) =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Source publishing {i}");
                observer.OnNext(i);
                await Task.Delay(1000);
            }

            observer.OnCompleted();

            return Disposable.Create(() => Console.WriteLine("Observable is disposed"));
        });

        var multiSource = source.Publish().RefCount();

        var subscription = multiSource.Subscribe(x => Console.WriteLine("Observer received: " + x));

        await Task.Delay(3000);

        subscription.Dispose();

        Console.WriteLine("Subscription disposed");

    }
}

и консольный вывод

Почему после 'subscription.Dispose ()' наблюдаемая все еще пытается произвести данные?

Спасибо!

1 Ответ

0 голосов
/ 22 апреля 2019

Ваша source наблюдаемая не соответствует упомянутому вами контракту с наблюдаемой. Если вы замените source на это:

    var source = Observable.Interval(TimeSpan.FromSeconds(1))
        .Do(i => Console.WriteLine($"Source publishing {i}"), () => Console.WriteLine("Observable is disposed"))
        .Take(10);

... вы увидите, что он работает как задумано.

Что касается причин, подумайте о наблюдаемой, имеющей две фазы: Подписаться и Наблюдать. Код, который происходит во время подписки, всегда происходит независимо от отмены подписки. Observable.Create код - это код подписки.

Наблюдаемый мною наблюдаемый код - это весь наблюдаемый код (как и должно быть большинство наблюдаемого). Таким образом, он соответствующим образом реагирует на отмену подписки.

...