Перехват исключений, которые могут быть выброшены из действия подписки OnNext - PullRequest
8 голосов
/ 27 августа 2011

Я немного новичок в Rx.NET. Можно ли поймать исключение, которое может быть выдано любым из подписчиков? Возьми следующее ...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

В настоящее время я ловлю на основе подписки с экземпляром следующего. Реализация которого просто использует ManualResetEvent для пробуждения ожидающего потока.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

и используя его вот так ...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

Я чувствую, что должен быть какой-то лучший способ. Все ли возможности обработки ошибок в Rx.NET специально предназначены для работы с наблюдаемыми ошибками?

РЕДАКТИРОВАТЬ: По запросу моя реализация https://gist.github.com/1409829 (интерфейс и реализация разделены на различные сборки в коде продукта). Обратная связь приветствуется. Это может показаться глупым, но я использую Castle Windsor для управления многими различными абонентами Rx. Эта ловушка исключений зарегистрирована в контейнере, как этот

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

Затем он будет использоваться следующим образом, где observable является экземпляром IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);

1 Ответ

8 голосов
/ 27 августа 2011

Встроенные операторы Observable не выполняют то, что вы запрашиваете по умолчанию (очень похоже на события), но вы можете создать метод расширения, который бы это делал.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.CreateWithDisposable<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Тогда любой наблюдаемый может быть обернут этим методом, чтобы получить поведение, которое вы описали.

...