И Джеймс, и Ричард сделали несколько хороших замечаний, но я не думаю, что они дали вам лучший способ решения вашей проблемы.
Джеймс предложил использовать .Catch(Observable.Never<Unit>())
.Он был неправ, когда сказал, что «позволит потоку продолжаться», потому что как только вы нажмете исключение, поток должен закончиться - на это указал Ричард, упомянув контракт между наблюдателями и наблюдаемыми.
Кроме того, использование Never
таким образом приведет к тому, что ваши наблюдаемые никогда не завершатся.
Короткий ответ: .Catch(Observable.Empty<Unit>())
- это правильный способ изменить последовательность с той, которая заканчивается ошибкой, на ту, котораязаканчивается завершением.
Вы натолкнулись на правильную идею использования SelectMany
для обработки каждого значения исходной коллекции, чтобы вы могли перехватить каждое исключение, но у вас осталась пара проблем.
Вы используете задачи (TPL) просто для превращения вызова функции в наблюдаемую.Это заставляет ваше наблюдаемое использовать потоки пула задач, что означает, что оператор SelectMany
, скорее всего, будет генерировать значения в недетерминированном порядке.
Также вы скрываете фактические вызовы для обработки ваших данных, что усложняет рефакторинг и обслуживание.
Я думаю, вам лучше создать метод расширения, позволяющий пропускать исключения.Вот он:
public static IObservable<R> SelectAndSkipOnException<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return
source
.Select(t =>
Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
.Merge();
}
С помощью этого метода вы можете теперь просто сделать это:
var result =
collection.ToObservable()
.SelectAndSkipOnException(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
});
Этот код намного проще, но он скрывает исключение (я).Если вы хотите придерживаться исключений, продолжая свою последовательность, то вам нужно сделать несколько дополнительных действий.Добавление нескольких перегрузок в метод расширения Materialize
позволяет избежать ошибок.
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<Notification<T>> source, Func<T, R> selector)
{
Func<Notification<T>, Notification<R>> f = nt =>
{
if (nt.Kind == NotificationKind.OnNext)
{
try
{
return Notification.CreateOnNext<R>(selector(nt.Value));
}
catch (Exception ex)
{
ex.Data["Value"] = nt.Value;
ex.Data["Selector"] = selector;
return Notification.CreateOnError<R>(ex);
}
}
else
{
if (nt.Kind == NotificationKind.OnError)
{
return Notification.CreateOnError<R>(nt.Exception);
}
else
{
return Notification.CreateOnCompleted<R>();
}
}
};
return source.Select(nt => f(nt));
}
Эти методы позволяют записать следующее:
var result =
collection
.ToObservable()
.Materialize(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
})
.Do(nt =>
{
if (nt.Kind == NotificationKind.OnError)
{
/* Process the error in `nt.Exception` */
}
})
.Where(nt => nt.Kind != NotificationKind.OnError)
.Dematerialize();
Вы можете даже связать эти Materialize
методы и использование ex.Data["Value"]
& ex.Data["Selector"]
для получения значения и функции выбора, которая выдает ошибку.
Надеюсь, это поможет.