Обработка ошибок в наблюдаемой последовательности с использованием Rx - PullRequest
12 голосов
/ 19 мая 2011

Есть ли способ иметь наблюдаемую последовательность, чтобы возобновить выполнение со следующим элементом в последовательности, если происходит ошибка? Из этого поста похоже, что вам нужно указать новую наблюдаемую последовательность в Catch (), чтобы возобновить выполнение, но что если вам нужно просто продолжить обработку следующего элемента в последовательности? Есть ли способ добиться этого?

UPDATE: Сценарий таков: У меня есть куча элементов, которые мне нужно обработать. Обработка состоит из нескольких шагов. я имею разложил шаги на задачи, которые я хотел бы составить. Я следовал рекомендациям для ToObservable (), опубликованным здесь конвертировать по задачам в наблюдаемые для композиции. так что в основном я делаю что-то вроде этого -

foreach(element in collection)
{
   var result = from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;
   result.subscribe( register on next and error handlers here)
 }

или я мог бы что-то вроде этого:

var result = 
        from element in collection.ToObservable() 
        from aResult in DoAAsync(element).ToObservable() 
         from bResult in DoBAsync(aResult).ToObservable() 
         from cResult in DoCAsync(bResult).ToObservable() 
         select cResult;

Каков наилучший способ продолжить обработку других элементов, даже если, скажем, обработка один из элементов выдает исключение. Я хотел бы иметь возможность регистрировать ошибку и двигаться в идеале.

Ответы [ 2 ]

12 голосов
/ 26 июля 2011

И Джеймс, и Ричард сделали несколько хороших замечаний, но я не думаю, что они дали вам лучший способ решения вашей проблемы.

Джеймс предложил использовать .Catch(Observable.Never<Unit>()).Он был неправ, когда сказал, что «позволит потоку продолжаться», потому что как только вы нажмете исключение, поток должен закончиться - на это указал Ричард, упомянув контракт между наблюдателями и наблюдаемыми.

Кроме того, использование Never таким образом приведет к тому, что ваши наблюдаемые никогда не завершатся.

Короткий ответ: .Catch(Observable.Empty<Unit>()) - это правильный способ изменить последовательность с той, которая заканчивается ошибкой, на ту, котораязаканчивается завершением.

Вы натолкнулись на правильную идею использования SelectMany для обработки каждого значения исходной коллекции, чтобы вы могли перехватить каждое исключение, но у вас осталась пара проблем.

Вы используете задачи (TPL) просто для превращения вызова функции в наблюдаемую.Это заставляет ваше наблюдаемое использовать потоки пула задач, что означает, что оператор SelectMany, скорее всего, будет генерировать значения в недетерминированном порядке.

Также вы скрываете фактические вызовы для обработки ваших данных, что усложняет рефакторинг и обслуживание.

Я думаю, вам лучше создать метод расширения, позволяющий пропускать исключения.Вот он:

public static IObservable<R> SelectAndSkipOnException<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return
        source
            .Select(t =>
                Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
            .Merge();
}

С помощью этого метода вы можете теперь просто сделать это:

var result =
    collection.ToObservable()
        .SelectAndSkipOnException(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        });

Этот код намного проще, но он скрывает исключение (я).Если вы хотите придерживаться исключений, продолжая свою последовательность, то вам нужно сделать несколько дополнительных действий.Добавление нескольких перегрузок в метод расширения Materialize позволяет избежать ошибок.

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<T> source, Func<T, R> selector)
{
    return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}

public static IObservable<Notification<R>> Materialize<T, R>(
    this IObservable<Notification<T>> source, Func<T, R> selector)
{
    Func<Notification<T>, Notification<R>> f = nt =>
    {
        if (nt.Kind == NotificationKind.OnNext)
        {
            try
            {
                return Notification.CreateOnNext<R>(selector(nt.Value));
            }
            catch (Exception ex)
            {
                ex.Data["Value"] = nt.Value;
                ex.Data["Selector"] = selector;
                return Notification.CreateOnError<R>(ex);
            }
        }
        else
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                return Notification.CreateOnError<R>(nt.Exception);
            }
            else
            {
                return Notification.CreateOnCompleted<R>();
            }
        }
    };
    return source.Select(nt => f(nt));
}

Эти методы позволяют записать следующее:

var result =
    collection
        .ToObservable()
        .Materialize(t =>
        {
            var a = DoA(t);
            var b = DoB(a);
            var c = DoC(b);
            return c;
        })
        .Do(nt =>
        {
            if (nt.Kind == NotificationKind.OnError)
            {
                /* Process the error in `nt.Exception` */
            }
        })
        .Where(nt => nt.Kind != NotificationKind.OnError)
        .Dematerialize();

Вы можете даже связать эти Materialize методы и использование ex.Data["Value"] & ex.Data["Selector"] для получения значения и функции выбора, которая выдает ошибку.

Надеюсь, это поможет.

1 голос
/ 19 мая 2011

Контракт между IObservable и IObserver является OnNext*(OnCompelted|OnError)?, который поддерживается всеми операторами, даже если не источником.

Ваш единственный выбор - повторно подписаться на источник, используя Retry, но если источник возвращает IObservable экземпляр для каждого описания, вы не увидите никаких новых значений.

Не могли бы вы предоставить больше информации о вашем сценарии?Может быть, есть другой способ взглянуть на это.

Редактировать: Судя по вашему обновленному отзыву, кажется, что вам просто нужно Catch:

var result = 
    from element in collection.ToObservable() 
    from aResult in DoAAsync(element).ToObservable().Log().Catch(Observable.Empty<TA>())
    from bResult in DoBAsync(aResult).ToObservable().Log().Catch(Observable.Empty<TB>()) 
    from cResult in DoCAsync(bResult).ToObservable().Log().Catch(Observable.Empty<TC>())
    select cResult;

Этозаменяет ошибку на Empty, которая не запускает следующую последовательность (поскольку она использует SelectMany под капотом.

...