Как объединить две наблюдаемые, чтобы результат завершался, когда завершается любая из наблюдаемых? - PullRequest
6 голосов
/ 03 февраля 2011

У меня есть этот код:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();
var ss = s1.Merge(s2).Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());
s1.OnCompleted(); // I wish ss finished here.
s2.OnCompleted(); // Yet it does so here. =(

Я решил свою проблему с помощью OnError (new OperationCanceledException ()), но я бы хотел лучшее решение (должен быть комбинатор, верно?).

Ответы [ 4 ]

7 голосов
/ 04 февраля 2011

Вместо перезаписи Merge для завершения, когда завершается любой поток, я бы предложил преобразовать события onCompleted в события onNext и использовать var ss = s1.Merge(s2).TakeUntil(s1ors2complete), где s1ors2complete выдает значение, когда заканчивается s1 или s2.Вы также можете просто связать .TakeUntil(s1completes).TakeUntil(s2completes) вместо создания s1ors2complete.Этот подход обеспечивает лучшую композицию, чем расширение MergeWithCompleteOnEither, поскольку его можно использовать для преобразования любого оператора «завершено при завершении» в оператор «завершено при любом завершении».

Как преобразовать события onNext в события onCompletedЕсть несколько способов сделать это.Метод CompositeDisposable звучит как хороший подход, и после небольшого поиска эта интересная тема о конвертируется между onNext, onError и onCompleted уведомлениями .Вероятно, я бы создал метод расширения ReturnTrueOnCompleted, используя xs.SkipWhile(_ => true).concat(Observable.Return(True)), и тогда ваше слияние станет:

var s1ors2complete = s1.ReturnTrueOnCompleted().Amb(s2.ReturnTrueOnCompleted());
var ss = s1.Merge(s2).TakeUntil(s1ors2complete).Finally(() => Console.WriteLine("Finished!"));

Вы также можете посмотреть на использование такого оператора, как Zip, который автоматически завершает , когда одиниз входных потоков завершается.

6 голосов
/ 03 февраля 2011

Или это, что тоже довольно аккуратно:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        return Observable.CreateWithDisposable<T>(obs =>
        {
            var compositeDisposable = new CompositeDisposable();
            var subject = new Subject<T>();

            compositeDisposable.Add(subject.Subscribe(obs));
            compositeDisposable.Add(source.Subscribe(subject));
            compositeDisposable.Add(right.Subscribe(subject));


            return compositeDisposable;

        });     
    }
}

При этом используется субъект, который будет гарантировать, что только один OnCompleted будет передан наблюдателю в CreateWithDisposable ();

2 голосов
/ 03 февраля 2011

Предполагая, что вам не нужен вывод ни одного из потоков, вы можете использовать Amb в сочетании с магией Materialize:

var s1 = new Subject<Unit>();
var s2 = new Subject<Unit>();

var ss = Observable.Amb(
        s1.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted), 
        s2.Materialize().Where(x => x.Kind == NotificationKind.OnCompleted)
    )
    .Finally(() => Console.WriteLine("Finished!"));

ss.Subscribe(_ => Console.WriteLine("Next"));

s1.OnNext(new Unit());
s2.OnNext(new Unit());

s1.OnCompleted(); // ss will finish here and s2 will be unsubscribed from

Если вам нужны значения, вы можете использовать Do по двум предметам.

0 голосов
/ 03 февраля 2011

Попробуйте это:

public static class Ext
{
    public static IObservable<T> MergeWithCompleteOnEither<T>(this IObservable<T> source, IObservable<T> right)
    {
        var completed = Observable.Throw<T>(new StreamCompletedException());

        return 
            source.Concat(completed)
            .Merge(right.Concat(completed))
            .Catch((StreamCompletedException ex) => Observable.Empty<T>());

    }

    private sealed class StreamCompletedException : Exception
    {
    }
}

То, что это делает, - это конкатенация IObservable, которая будет генерировать исключение, когда завершается либо источник, либо правильный источник. Затем мы можем использовать метод расширения Catch для возврата пустой Observable для автоматического завершения потока при завершении любого из них.

...