Rx делает замечательную работу по набору ошибок и их устранению, когда происходит одно из следующих действий:
- Наблюдаемое отменяется (посредством удаления подписки)
- Наблюдаемое завершение.
- Наблюдаемое совершает ошибку.
Но для этого вся вычислительная цепочка должна находиться под контролем наблюдаемой подписки.
В вашем коде этого не произошло.
Observable.FromAsync
принимает Func<Task<T>>
и возвращает и IObservable<T>
.Ваш Observable.FromAsync
возвращает IObservable<Task>
, что означает, что ваш параметр возвращает Task<Task>
.И это то, что делает Task.WhenAny(Do1(), Do2())
.Это означает, что внутренняя задача Task<Task>
не находится под контролем наблюдаемой подписки.
Чтобы исправить код, вы можете сделать две вещи.
Первая - исправитьTPL, так что он находится под контролем наблюдаемого.Это значит сделать его отменяемым, а также распаковать Task.WhenAny
.
Вот новые Do
методы:
private static Task Do1(CancellationToken ct)
{
return Task.Run(async () =>
{
await Task.Delay(500, ct);
throw new AccessViolationException("Oops 1");
}, ct);
}
private static Task Do2(CancellationToken ct)
{
return Task.Run(async () =>
{
await Task.Delay(600, ct);
throw new AccessViolationException("Oops 2");
}, ct);
}
Теперь вы можете написать свой код следующим образом:
using (
Observable
.FromAsync(ct => Task.WhenAny(Do1(ct), Do2(ct)).Unwrap())
.Subscribe(
x => { },
ex => Console.WriteLine($"Error {ex}"),
() => Console.WriteLine("Completed Inner")))
{
Console.WriteLine("Press any key to dispose");
Console.ReadLine();
}
Теперь это работает как положено.
Второе, что вы можете сделать, это не писать это с помощью задач.Используйте чистый код Rx.
Вот альтернатива Do
s:
private static IObservable<Unit> Do1() =>
from x in Observable.Timer(TimeSpan.FromMilliseconds(500))
from y in Observable.Throw<AccessViolationException>(new AccessViolationException("Oops 1"))
select Unit.Default;
private static IObservable<Unit> Do2() =>
from x in Observable.Timer(TimeSpan.FromMilliseconds(600))
from y in Observable.Throw<AccessViolationException>(new AccessViolationException("Oops 2"))
select Unit.Default;
Вот основной код:
using (
Observable
.Amb(Do1(), Do2())
.Subscribe(
x => { },
ex => Console.WriteLine($"Error {ex}"),
() => Console.WriteLine("Completed Inner")))
{
Console.WriteLine("Press any key to dispose");
Console.ReadLine();
}
Это тоже работает.
Я предпочитаю Rx способ сделать это.