Вот пример программы, которая делает две подписки на консольный ввод (источник, наблюдаемый здесь не имеет значения). В первой подписке используется Observable.SelectMany, а во второй - аналогичный оператор SelectMany, который внутренне использует пакет System.Threading.Tasks.Dataflow . Исключение выдается для определенных входных данных в каждом из них. Исключение корректно пересылается в Observer onError, который перебрасывает его в стандартной реализации подписки. Наблюдаемое поведение состоит в том, что в случае исключения в SelectMany процесс продолжает выполняться, а в случае исключения в SelectManyPreseveOrder процесс завершается с необработанным исключением. В чем причина разного поведения? Есть ли способ добиться «более дружественного» поведения в операторе SelectManyPreserveOrder? Это консольное приложение .net 4.6.1, использующее Rx.Linq 2.2.5 и System.Threading.Tasks.Dataflow 4.10.0:
class Program
{
static async Task Main()
{
AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");
var consoleInput = Helper.ConsoleInput();
consoleInput.SelectMany(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "1")
throw new Exception("This exception is swallowed");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany: {s}"));
consoleInput.SelectManyPreserveOrder(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "2")
throw new Exception("This exception kills the process");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));
await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
}
}
public static class ObservableExtension
{
public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
{
return source.FromTplDataflow(() =>
new TransformBlock<TSource, TResult>(selector,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
}
public static IObservable<TResult> FromTplDataflow<T, TResult>(
this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
{
return Observable.Defer(() =>
{
var block = blockFactory();
return Observable.Using(() =>
{
var disposable = source.Subscribe(block.AsObserver());
return Disposable.Create(dispose: () => disposable.Dispose());
}, r => block.AsObservable());
});
}
}
public static class Helper
{
public static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
Интересно, что обработчик UnobservedTaskException никогда не вызывается.