Как насчет этого:
class Program
{
public event EventHandler<EventArgs> MyEvent;
static void Main(string[] args)
{
var myClass = new Program();
var task = new Task(() =>
{
for(var i=0; i<5; i++) {
// Generate standard .NET event.
myClass.MyEvent(myClass, new EventArgs());
}
throw new Exception();
});
var obsTask = task.ToObservable();
var events = Observable.FromEvent<EventArgs>(h => myClass.MyEvent += h, h => myClass.MyEvent -= h);
events.TakeUntil(obsTask).Subscribe(
ev => DoSomethingOnNext(ev),
ex => DoSomethingOnError(ex),
() => DoSomethingOnCompleted());
task.Start();
Console.ReadKey();
}
private static void DoSomethingOnCompleted()
{
Console.WriteLine("DoSomethingOnCompleted");
}
private static void DoSomethingOnError(Exception ex)
{
Console.WriteLine("DoSomethingOnError:" + ex.ToString());
}
private static void DoSomethingOnNext(IEvent<EventArgs> ev)
{
Console.WriteLine("DoSomethingOnNext:" + ev.ToString());
}
Вывод:
DoSomethingOnNext: System.Collections.Generic.Event 1[System.EventArgs]
DoSomethingOnNext:System.Collections.Generic.Event
1 [System.EventArgs] DoSomethingOnNext: System.Collections.Generic.Event 1[System.EventArgs]
DoSomethingOnNext:System.Collections.Generic.Event
1 [System.EventArgs] DoSomethingOnNext: System.Collections.Generic.Event`1 [System.EventArgs] DoSomethingOnError: System.AggregateException: произошла одна или несколько ошибок.---> System.Exception: было сгенерировано исключение типа System.Exception.в RxDisposeTests.Program. <> c_ DisplayClass9.b _0 () в C: \ Users \ Richard.Hein \ Documents \ Visual Studio 2010 \ Projects \ RxConsole \ RxDisposeTests \ Program.cs: строка 25 в системе.Threading.Tasks.Task.InnerInvoke () в System.Threading.Tasks.Task.Execute () --- Конец внутренней трассировки стека исключений --- ---> (Внутреннее исключение # 0) System.Exception: ИсключениеТип «System.Exception» был брошен.в RxDisposeTests.Program. <> c_ DisplayClass9.b _0 () в C: \ Users \ Richard.Hein \ Documents \ Visual Studio 2010 \ Projects \ RxConsole \ RxDisposeTests \ Program.cs: строка 25 в системе.Threading.Tasks.Task.InnerInvoke () в System.Threading.Tasks.Task.Execute () <--- </p>
РЕДАКТИРОВАТЬ:
Не уверен, если TakeUntilявляется хорошим решением, потому что Task может возвращать что-то кроме Исключений, верно?Так что это может сработать:
var events = Observable.CreateWithDisposable<IEvent<EventArgs>>(observer =>
{
var eventObs = Observable.FromEvent<EventArgs>(
h => myClass.MyEvent += h, h => myClass.MyEvent -= h);
task.ToObservable().Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
return eventObs.Subscribe(observer.OnNext, observer.OnError, observer.OnCompleted);
});
events.Subscribe(
ev => DoSomethingOnNext(ev),
ex => DoSomethingOnError(ex),
() => DoSomethingOnCompleted());