Я использую Rx .NET Subject.OnError
, и, похоже, он генерирует, а не распространяет исключение. Мой сценарий состоит в том, что Субъект получает данные в отдельном потоке, и вызывающий поток должен что-то делать, когда эти данные возвращаются, а также ожидать завершения всех данных из наблюдаемой, а также распространять любые возникающие исключения.
Вот упрощенный пример:
class Program
{
static async Task Main(string[] args)
{
var subject = new Subject<bool>();
Task.Run(async () =>
{
await Task.Delay(5000);
subject.OnError(new Exception()); //This call is throwing!
});
subject.Subscribe(e =>
{
//Do some data processing here
});
try
{
//Need to wait for observable to complete before returning to the caller
await subject.LastOrDefaultAsync();
}
catch
{
//Do some logging, clean up resources
throw;
}
}
}
Если я уберу вызов subject.Subscribe()
, код будет работать так, как вы ожидаете, и исключение будет переброшено на subject.LastOrDefaultAsync()
. Однако при наличии Subscribe
вызов subject.OnError()
немедленно вызывает повторное исключение на месте (не передавая его наблюдаемому), что мне кажется совершенно странным.
Как мне решить эту проблему?
(К вашему сведению, тонна кода уже написана с использованием Subject
, поэтому предложение не использовать его вообще не является приемлемым решением)