Правильный способ завершить наблюдаемый поток по ошибке - PullRequest
1 голос
/ 07 февраля 2020

У меня есть несколько методов, которые возвращают IObservable. Во всех случаях я настраивал запрос, который приведет к завершению возвращаемой наблюдаемой. Обычно я использую метод расширения TakeUntil. Наблюдаемый тип, который я использую в TakeUntil, содержит флаг, который сообщает мне, была ли проблема. Как я могу использовать это, чтобы мои возвращаемые наблюдаемые заканчивались ошибкой? Я хотел бы перегрузку TakeUntil, которая позволила наблюдаемой завершиться с ошибкой.

В настоящее время я взломал метод, чтобы вернуть субъект, который подписался на наблюдаемый запрос, а также подписался на другую наблюдаемую, которую я использую в TakeUntil для либо вызовите OnCompleted или OnError. Я понимаю, что это плохой план, но что мне делать? Любая помощь высоко ценится.

Ответы [ 2 ]

0 голосов
/ 08 февраля 2020

Тебе не нужно ничего делать. То, что вы просите, уже встроено.

Если вы начнете с этого кода:

var source = Observable.Interval(TimeSpan.FromSeconds(1.0));
var ender = new Subject<Unit>();
var query = source.TakeUntil(ender);
query.Subscribe(x => Console.WriteLine(x));

Тогда вам нужно будет позвонить ender.OnError(new Exception("My exception")); только для source, наблюдаемой для конец с ошибкой за исключением new Exception("My exception").

0 голосов
/ 08 февраля 2020

В отличие от TakeUntil, вы должны выбрать один из трех случаев, соответствующих OnNext, OnError и OnCompleteted. И уже есть встроенный тип, который фиксирует его в Notification<T>.

Все, что нам нужно сделать, - это преобразовать конкретные уведомления в неявные - с помощью оператора Dematerialize.

Вот пример потока, который выдает, если значение 10 встречается в потоке, но завершает его. если любое значение>> 9.

        var errorAt10 =
            values.Select(value =>
            {
                if (value == 10)
                    return Notification.CreateOnError<long>(new Exception());

                if (value >= 9)
                    return Notification.CreateOnCompleted<long>();

                return Notification.CreateOnNext(value);
            })
            .Dematerialize();

Мы могли бы упростить это, если бы хотели:

    public static IObservable<T> NotifyAs<T>(this IObservable<T> source, Func<T, NotificationKind> choice, Exception exception = default)
    {
        return source.Select(value =>
        {
            switch (choice(value))
            {
                case NotificationKind.OnError:
                    return Notification.CreateOnError<T>(exception ?? new Exception());
                case NotificationKind.OnCompleted:
                    return Notification.CreateOnCompleted<T>();
                default:
                    return Notification.CreateOnNext(value);
            }

        })
        .Dematerialize();
    }

Теперь вы можете переписать предыдущий пример как:

        var errorAt10 =
            values.NotifyAs(value =>
                value == 10 ? NotificationKind.OnError :
                value >= 9 ? NotificationKind.OnCompleted :
                NotificationKind.OnNext
            );
...