Реактивная наблюдаемая утилизация подписки - PullRequest
19 голосов
/ 09 октября 2011

Если у меня есть доступ к IObservable, который, как я знаю, когда-нибудь вернет только один элемент, сработает ли это и является ли это наилучшим шаблоном использования?

Ответы [ 4 ]

51 голосов
/ 10 октября 2011

Одноразовое значение, возвращаемое методами расширения Subscribe, возвращается исключительно для того, чтобы вы могли вручную отписаться от наблюдаемой до , наблюдаемое естественным образом заканчивается.

Если наблюдаемое завершается - либоOnCompleted или OnError - тогда подписка уже выбрана для вас.

Попробуйте этот код:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

Если вы запустите выше, вы увидите, что "Disposed!"записывается в консоль, когда наблюдаемое завершается без необходимости вызова .Dispose() для подписки.

Одна важная вещь, на которую следует обратить внимание: сборщик мусора никогда не вызывает .Dispose() для наблюдаемых подписок, поэтому вы должны утилизируйте ваши подписки, если они не закончились (или могут не иметь) закончились до того, как ваша подписка выйдет за рамки.

Возьмите это, например:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

The ds observable будет подключаться к обработчику событий только тогда, когда у него есть подписка, и будет отключаться только после завершения наблюдения или удаления подписки.Поскольку это обработчик событий, наблюдаемое никогда не завершится, потому что оно ожидает большего количества событий, и, следовательно, удаление - единственный способ отсоединиться от события (для приведенного выше примера).

Когда у вас есть FromEventPattern Заметим, что вы знаете, что когда-либо будет возвращать только одно значение, тогда разумно добавить метод расширения .Take(1) перед подпиской, чтобы позволить обработчику событий автоматически отключаться, и тогда вам не нужно вручную удалять подписку.

Вот так:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

Надеюсь, это поможет.

12 голосов
/ 09 октября 2011

Отказ от ответственности: Я также все еще изучаю Rx.Так что я на самом деле не эксперт, но я верю, что одноразовый товар, возвращенный Subscribe, только отменит подписку.Также, если источник завершается, как в вашем случае, отмена подписки выполняется автоматически.Поэтому я думаю, что Dispose существует избыточно и может быть безопасно удалено.

См. Ответ на этот вопрос для получения дополнительной информации.

4 голосов
/ 10 октября 2011

Функция Take сделает именно то, что вы ищете. В этом случае Take(1).

0 голосов
/ 26 октября 2016

В отличие от некоторых комментариев, совсем нередко распоряжаться подпиской изнутри OnNext.

Хотя верно, что удаление в OnCompleted и OnError выполняется для вас завернутымподписку, которую создает метод расширения Subscribe, вы можете отменить подписку на основе значения, которое вы наблюдаете (как в вашем случае: 1-е).У вас не всегда есть наблюдаемое, которое, как известно, производит только одно значение.

Проблема в том, что вы получаете IDisposable только после того, как подписались.Наблюдаемый может перезвонить вам на OnNext даже прежде, чем он вернет вам IDisposable для отказа от подписки (в зависимости от того, что использует IScheduler).

В этом случае пригодится System.Reactive.Disposables.SingleAssignmentDisposable,Он оборачивает IDisposable, который вы можете назначить поздно, и сразу же утилизирует его при назначении, если к этому времени уже был удален SingleAssignmentDisposable.Также оно имеет свойство IsDisposed, которое изначально равно false и имеет значение true, когда вызывается Dispose().

Итак:

IObservable<string> source = ...;

var subscription = new SingleAssignmentDisposable();
subscription.Disposable = source.Subscribe(x =>
{
    if (subscription.IsDisposed) // getting notified though I've told it to stop
        return;
    DoThingsWithItem(x);
    if (x == "the last item I'm interested in")
        subscription.Dispose();
});
...