Subject.OnError throwing - PullRequest
       13

Subject.OnError throwing

0 голосов
/ 11 января 2019

Я использую Rx .NET Subject.OnError, и, похоже, он генерирует, а не распространяет исключение. Мой сценарий состоит в том, что Субъект получает данные в отдельном потоке, и вызывающий поток должен что-то делать, когда эти данные возвращаются, а также ожидать завершения всех данных из наблюдаемой, а также распространять любые возникающие исключения.

Вот упрощенный пример:

class Program
{
    static async Task Main(string[] args)
    {
        var subject = new Subject<bool>();

        Task.Run(async () =>
        {
            await Task.Delay(5000);
            subject.OnError(new Exception()); //This call is throwing!
        });

        subject.Subscribe(e =>
        {
            //Do some data processing here
        });

        try
        {
            //Need to wait for observable to complete before returning to the caller
            await subject.LastOrDefaultAsync();
        }
        catch
        {
            //Do some logging, clean up resources
            throw;
        }
    }
}

Если я уберу вызов subject.Subscribe(), код будет работать так, как вы ожидаете, и исключение будет переброшено на subject.LastOrDefaultAsync(). Однако при наличии Subscribe вызов subject.OnError() немедленно вызывает повторное исключение на месте (не передавая его наблюдаемому), что мне кажется совершенно странным.

Как мне решить эту проблему?

(К вашему сведению, тонна кода уже написана с использованием Subject, поэтому предложение не использовать его вообще не является приемлемым решением)

1 Ответ

0 голосов
/ 11 января 2019

Вот более простой пример:

void Main()
{
    var subject = new Subject<bool>();
    subject.Subscribe(b => {/* bool handling code */});
    subject.OnError(new Exception()); //This call is throwing!
}

Эта перегрузка Subscribe перебрасывает получаемые исключения. Если вы хотите игнорировать исключения, сделайте следующее:

void Main()
{
    var subject = new Subject<bool>();
    // subject.Subscribe();
    subject.Subscribe(b => {/* bool handling code */}, e => { });
    subject.OnError(new Exception()); //This call is throwing!

}

Если вы хотите увидеть этот источник, посмотрите здесь: https://github.com/dotnet/reactive/blob/master/Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs (строка 63). Выдает любые пойманные исключения.


РЕДАКТИРОВАТЬ :

Если вы хотите погрузиться в кроличью нору, вот (эффективно) код обработки исключений, который в итоге вызывается с перегрузкой .Subscribe(onNextHandlerOnly):

void Main()
{
    var subject = new Subject<bool>();
    subject.Subscribe(b => b.Dump(), e => { e.Throw(); }, () => {});
    subject.OnError(new Exception()); //This call is throwing!

}

public static class X
{
    public static void Throw(this Exception exception)
    {
         System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(exception).Throw();
    }
}

При вызове EDI.Capture создается впечатление, что «источником» исключения является вызов OnError, а не Subscribe.

...